diff options
Diffstat (limited to 'rpkid/rpki/https.py')
-rw-r--r-- | rpkid/rpki/https.py | 250 |
1 files changed, 188 insertions, 62 deletions
diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py index 9a86661a..60f50909 100644 --- a/rpkid/rpki/https.py +++ b/rpkid/rpki/https.py @@ -40,30 +40,23 @@ import time, socket, asyncore, asynchat, traceback, urlparse import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log import POW -import os -if os.getlogin() != "sra": - # - # Have to keep this warning for now, but it has long since become - # tedious while testing other code I have to finish first. - # - print "====== WARNING WARNING WARNING ======" - print "THIS VERSION OF rpki.https DOES NOT SUPPORT TLS." - print "CONNECTIONS ARE NOT SECURE." - print "THIS IS A DEVELOPMENT VERSION, TLS WILL BE ADDED LATER." - print "====== WARNING WARNING WARNING ======" - rpki_content_type = "application/x-rpki" # ================================================================ -debug = False +# Chatter about TLS certificates +debug_tls_certs = True + +# Verbose chatter about HTTP streams +debug = True +# Whether we want persistent HTTP streams, when peer also supports them want_persistent_client = True want_persistent_server = True -idle_timeout_default = rpki.sundial.timedelta(seconds = 90) -active_timeout_default = idle_timeout_default +# Default HTTP connection timeout (set very short for initial testing) +default_timeout = rpki.sundial.timedelta(seconds = 90) default_http_version = (1, 0) @@ -181,33 +174,30 @@ def logger(self, msg): class http_stream(asynchat.async_chat): log = logger + tls = None + retry_read = None + retry_write = None - idle_timeout = idle_timeout_default - active_timeout = active_timeout_default + timeout = default_timeout - def __init__(self, conn = None, cert = None, key = None, ta = None, dynamic_ta = None): + def __init__(self, conn = None): asynchat.async_chat.__init__(self, conn = conn) self.buffer = [] self.timer = rpki.async.timer(self.handle_timeout) - self.cert = cert - self.key = key - self.ta = ta - self.dynamic_ta = dynamic_ta self.restart() - def restart(self, idle = True): + def restart(self): assert not self.buffer self.chunk_handler = None self.set_terminator("\r\n\r\n") - timeout = self.idle_timeout if idle else self.active_timeout - if timeout is not None: - self.timer.set(timeout) + if self.timeout is not None: + self.timer.set(self.timeout) else: self.timer.cancel() - def update_active_timeout(self): - if self.active_timeout is not None: - self.timer.set(self.active_timeout) + def update_timeout(self): + if self.timeout is not None: + self.timer.set(self.timeout) else: self.timer.cancel() @@ -216,7 +206,7 @@ class http_stream(asynchat.async_chat): Buffer the data """ self.buffer.append(data) - self.update_active_timeout() + self.update_timeout() def get_buffer(self): val = "".join(self.buffer) @@ -224,7 +214,7 @@ class http_stream(asynchat.async_chat): return val def found_terminator(self): - self.update_active_timeout() + self.update_timeout() if self.chunk_handler: self.chunk_handler() elif not isinstance(self.get_terminator(), str): @@ -275,8 +265,9 @@ class http_stream(asynchat.async_chat): self.handle_message() def handle_error(self): + self.log("Error in HTTP stream handler") print traceback.format_exc() - self.log("Error in HTTP stream handler, closing") + self.log("Closing due to error") self.close() def handle_timeout(self): @@ -285,19 +276,121 @@ class http_stream(asynchat.async_chat): def handle_close(self): self.log("Close event in HTTP stream handler") - asynchat.async_chat.handle_close(self) self.timer.cancel() + def send(self, data): + assert self.retry_read is None and self.retry_write is None + return self.tls.write(data) + + def recv(self, buffer_size): + assert self.retry_read is None and self.retry_write is None + return self.tls.read(buffer_size) + + def readable(self): + return self.retry_read is not None or (self.retry_write is None and asynchat.async_chat.readable(self)) + + def writeable(self): + return self.retry_write is not None or (self.retry_read is None and asynchat.async_chat.writeable(self)) + + def handle_read(self): + assert self.retry_write is None + if self.retry_read is not None: + thunk = self.retry_read + self.retry_read = None + self.log("Retrying TLS read %r" % thunk) + thunk() + else: + try: + asynchat.async_chat.handle_read(self) + except POW.WantReadError: + self.retry_read = self.handle_read + except POW.WantWriteError: + self.retry_write = self.handle_read + except POW.ZeroReturnError: + self.log("ZeroReturn in handle_read()") + self.close() + except POW.SSLUnexpectedEOFError: + self.log("SSLUnexpectedEOF in handle_read()") + self.close() + + def handle_write(self): + assert self.retry_read is None + if self.retry_write is not None: + thunk = self.retry_write + self.retry_write = None + thunk() + self.log("Retrying TLS write %r" % thunk) + else: + asynchat.async_chat.handle_write(self) + + def initate_send(self): + assert self.retry_read is None and self.retry_write is None + try: + asynchat.async_chat.initiate_send(self) + except POW.WantReadError: + self.retry_read = self.initiate_send + except POW.WantWriteError: + self.retry_write = self.initiate_send + except POW.ZeroReturnError: + self.log("ZeroReturn in initiate_send()") + self.close() + except POW.SSLUnexpectedEOFError: + self.log("SSLUnexpectedEOF in initiate_send()") + self.close() + + def close(self): + self.log("Close requested") + assert self.retry_read is None and self.retry_write is None + if self.tls is not None: + try: + ret = self.tls.shutdown() + self.log("tls.shutdown() returned %d" % ret) + self.tls = None + asynchat.async_chat.close(self) + self.handle_close() + except POW.WantReadError: + self.retry_read = self.close + except POW.WantWriteError: + self.retry_write = self.close + class http_server(http_stream): parse_type = http_request - def __init__(self, conn, handlers, cert = None, key = None, ta = None, dynamic_ta = None): + def __init__(self, conn, handlers, cert = None, key = None, ta = (), dynamic_ta = None): self.log("Starting") self.handlers = handlers - http_stream.__init__(self, conn = conn, cert = cert, key = key, ta = ta, dynamic_ta = dynamic_ta) + http_stream.__init__(self, conn = conn) self.expect_close = not want_persistent_server + self.log("cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta)) + + self.tls = POW.Ssl(POW.TLSV1_SERVER_METHOD) + if debug_tls_certs: + self.log("HTTPS server cert issuer %s [%s] subject %s [%s]" % (cert.getIssuer(), cert.hAKI(), cert.getSubject(), cert.hSKI())) + self.tls.useCertificate(cert.get_POW()) + self.tls.useKey(key.get_POW()) + ta = set(dynamic_ta() if dynamic_ta else ta) + ta.discard(None) + if not ta: + raise RuntimeError, "No trust anchor(s) specified, this is unlikely to work" + for x in ta: + if debug_tls_certs: + self.log("HTTPS trusted cert issuer %s [%s] subject %s [%s]" % (x.getIssuer(), x.hAKI(), x.getSubject(), x.hSKI())) + self.tls.trustCertificate(x.get_POW()) + self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT) + + self.tls.setFd(self.fileno()) + self.tls_accept() + + def tls_accept(self): + try: + self.tls.accept() + except POW.WantReadError: + self.retry_read = self.tls_accept + except POW.WantWriteError: + self.retry_write = self.tls_accept + def handle_no_content_length(self): self.handle_message() @@ -325,7 +418,7 @@ class http_server(http_stream): if error is None: try: handler(self.msg.body, self.msg.path, self.send_reply) - except rpki.async.ExitNow: + except (rpki.async.ExitNow, SystemExit): raise except Exception, edata: print traceback.format_exc() @@ -358,6 +451,7 @@ class http_listener(asyncore.dispatcher): log = logger def __init__(self, handlers, port = 80, host = "", cert = None, key = None, ta = None, dynamic_ta = None): + self.log("Listener cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta)) asyncore.dispatcher.__init__(self) self.handlers = handlers self.cert = cert @@ -370,7 +464,7 @@ class http_listener(asyncore.dispatcher): self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self.bind((host, port)) self.listen(5) - except rpki.async.ExitNow: + except (rpki.async.ExitNow, SystemExit): raise except: self.handle_error() @@ -380,7 +474,7 @@ class http_listener(asyncore.dispatcher): self.log("Accepting connection") try: http_server(conn = self.accept()[0], handlers = self.handlers, cert = self.cert, key = self.key, ta = self.ta, dynamic_ta = self.dynamic_ta) - except rpki.async.ExitNow: + except (rpki.async.ExitNow, SystemExit): raise except: self.handle_error() @@ -393,23 +487,58 @@ class http_client(http_stream): parse_type = http_response - def __init__(self, queue, hostport, cert = None, key = None, ta = None, dynamic_ta = None): + def __init__(self, queue, hostport, cert = None, key = None, ta = ()): self.log("Creating new connection to %s" % repr(hostport)) - http_stream.__init__(self, cert = cert, key = key, ta = ta, dynamic_ta = dynamic_ta) + self.log("cert %r key %r ta %r" % (cert, key, ta)) + http_stream.__init__(self) self.queue = queue self.hostport = hostport self.state = "opening" self.expect_close = not want_persistent_client + self.cert = cert + self.key = key + self.ta = set(ta) + self.ta.discard(None) def start(self): try: self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.hostport) - except rpki.async.ExitNow: + except (rpki.async.ExitNow, SystemExit): raise except: self.handle_error() + def handle_connect(self): + self.log("Connected") + self.set_state("idle") + + self.tls = POW.Ssl(POW.TLSV1_CLIENT_METHOD) + if debug_tls_certs: + self.log("HTTPS client cert issuer %s [%s] subject %s [%s]" % (self.cert.getIssuer(), self.cert.hAKI(), self.cert.getSubject(), self.cert.hSKI())) + self.tls.useCertificate(self.cert.get_POW()) + self.tls.useKey(self.key.get_POW()) + if not self.ta: + raise RuntimeError, "No trust anchor(s) specified, this is unlikely to work" + for x in self.ta: + if debug_tls_certs: + self.log("HTTPS trusted cert issuer %s [%s] subject %s [%s]" % (x.getIssuer(), x.hAKI(), x.getSubject(), x.hSKI())) + self.tls.trustCertificate(x.get_POW()) + self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT) + + self.tls.setFd(self.fileno()) + self.tls_connect() + + def tls_connect(self): + try: + self.tls.connect() + except POW.WantReadError: + self.retry_read = self.tls_connect + except POW.WantWriteError: + self.retry_write = self.tls_connect + else: + self.queue.send_request() + def set_state(self, state): self.log("State transition %s => %s" % (self.state, state)) self.state = state @@ -423,7 +552,7 @@ class http_client(http_stream): self.set_state("request-sent") msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" self.push(msg.format()) - self.restart(idle = False) + self.restart() def handle_message(self): self.log("Message received, state %s" % self.state) @@ -446,7 +575,7 @@ class http_client(http_stream): else: self.log("Idling") self.set_state("idle") - self.timer.set(self.idle_timeout) + self.update_timeout() if self.msg.code == 200: self.queue.return_result(self.msg) @@ -454,11 +583,6 @@ class http_client(http_stream): self.queue.return_result(rpki.exceptions.HTTPRequestFailed( "HTTPS request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body))) - def handle_connect(self): - self.log("Connected") - self.set_state("idle") - self.queue.send_request() - def handle_close(self): http_stream.handle_close(self) self.log("State %s" % self.state) @@ -479,7 +603,7 @@ class http_client(http_stream): self.queue.detach(self) try: raise - except rpki.async.ExitNow: + except (rpki.async.ExitNow, SystemExit): raise except Exception, edata: self.queue.return_result(edata) @@ -488,15 +612,15 @@ class http_queue(object): log = logger - def __init__(self, hostport, cert = None, key = None, ta = None, dynamic_ta = None): + def __init__(self, hostport, cert = None, key = None, ta = ()): self.log("Creating queue for %s" % repr(hostport)) + self.log("cert %r key %r ta %r" % (cert, key, ta)) self.hostport = hostport self.client = None self.queue = [] self.cert = cert self.key = key self.ta = ta - self.dynamic_ta = dynamic_ta def request(self, *requests): self.log("Adding requests %r" % requests) @@ -504,7 +628,7 @@ class http_queue(object): def restart(self): if self.client is None: - client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta, dynamic_ta = self.dynamic_ta) + client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta) self.log("Attaching client %r" % client) self.client = client self.client.start() @@ -525,6 +649,9 @@ class http_queue(object): def return_result(self, result): + if not self.queue: + self.log("No caller, this should not happen. Dropping result %r" % result) + req = self.queue.pop(0) self.log("Dequeuing request %r" % req) @@ -536,7 +663,7 @@ class http_queue(object): assert isinstance(result, Exception) self.log("Returning exception %r to caller: %s" % (result, result)) req.errback(result) - except rpki.async.ExitNow: + except (rpki.async.ExitNow, SystemExit): raise except: self.log("Unhandled exception from callback") @@ -553,9 +680,6 @@ def client(msg, client_key, client_cert, server_ta, url, callback, errback): """ Open client HTTPS connection, send a message, set up callbacks to handle response. - - THIS VERSION DOES NOT DO TLS. THIS IS EXPERIMENTAL CODE. DO NOT - USE IN PRODUCTION UNTIL TLS SUPPORT HAS BEEN ADDED. """ u = urlparse.urlparse(url) @@ -583,8 +707,10 @@ def client(msg, client_key, client_cert, server_ta, url, callback, errback): if debug: rpki.log.debug("Created request %r for %r" % (request, hostport)) + if not isinstance(server_ta, (tuple, list)): + server_ta = (server_ta,) if hostport not in client_queues: - client_queues[hostport] = http_queue(hostport) + client_queues[hostport] = http_queue(hostport, cert = client_cert, key = client_key, ta = server_ta) client_queues[hostport].request(request) # Defer connection attempt until after we've had time to process any @@ -594,18 +720,18 @@ def client(msg, client_key, client_cert, server_ta, url, callback, errback): rpki.log.debug("Scheduling connection startup for %r" % request) rpki.async.timer(client_queues[hostport].restart, errback).set(None) -def server(handlers, server_key, server_cert, port, host ="", client_ta = None, dynamic_https_trust_anchor = None): +def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dynamic_https_trust_anchor = None): """ Run an HTTPS server and wait (forever) for connections. - - THIS VERSION DOES NOT DO TLS. THIS IS EXPERIMENTAL CODE. DO NOT - USE IN PRODUCTION UNTIL TLS SUPPORT HAS BEEN ADDED. """ if not isinstance(handlers, (tuple, list)): handlers = (("/", handlers),) - http_listener(port = port, handlers = handlers) + if not isinstance(client_ta, (tuple, list)): + server_ta = (client_ta,) + + http_listener(port = port, handlers = handlers, cert = server_cert, key = server_key, ta = client_ta, dynamic_ta = dynamic_https_trust_anchor) rpki.async.event_loop() def build_https_ta_cache(certs): |