00001 """
00002 HTTPS utilities, both client and server.
00003
00004 $Id: https.py 3282 2010-06-10 21:03:17Z sra $
00005
00006 Copyright (C) 2009-2010 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019
00020 Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
00021
00022 Permission to use, copy, modify, and distribute this software for any
00023 purpose with or without fee is hereby granted, provided that the above
00024 copyright notice and this permission notice appear in all copies.
00025
00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00028 AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00032 PERFORMANCE OF THIS SOFTWARE.
00033 """
00034
00035 import time, socket, asyncore, asynchat, urlparse, sys, random
00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log
00037 import POW
00038
00039
00040
00041 rpki_content_type = "application/x-rpki"
00042
00043
00044
00045 debug_http = False
00046
00047
00048
00049 debug_tls_certs = False
00050
00051
00052
00053 want_persistent_client = False
00054
00055
00056
00057 want_persistent_server = False
00058
00059
00060
00061 default_client_timeout = rpki.sundial.timedelta(minutes = 15)
00062
00063
00064
00065
00066
00067
00068 default_server_timeout = rpki.sundial.timedelta(minutes = 20)
00069
00070
00071
00072 default_http_version = (1, 0)
00073
00074
00075
00076 default_tcp_port = 443
00077
00078
00079
00080
00081 enable_ipv6_servers = True
00082
00083
00084
00085
00086
00087 enable_ipv6_clients = False
00088
00089
00090
00091
00092 use_adns = False
00093
00094
00095
00096
00097
00098
00099
00100
00101 try:
00102 socket.socket(socket.AF_INET6).close()
00103 except:
00104 have_ipv6 = False
00105 else:
00106 have_ipv6 = True
00107
00108 def supported_address_families(enable_ipv6):
00109 """
00110 IP address families on which servers should listen, and to consider
00111 when selecting addresses for client connections.
00112 """
00113 if enable_ipv6 and have_ipv6:
00114 return (socket.AF_INET, socket.AF_INET6)
00115 else:
00116 return (socket.AF_INET,)
00117
00118 def localhost_addrinfo():
00119 """
00120 Return pseudo-getaddrinfo results for localhost.
00121 """
00122 result = [(socket.AF_INET, "127.0.0.1")]
00123 if enable_ipv6_clients and have_ipv6:
00124 result.append((socket.AF_INET6, "::1"))
00125 return result
00126
00127 class http_message(object):
00128 """
00129 Virtual class representing of one HTTP message.
00130 """
00131
00132 software_name = "ISC RPKI library"
00133
00134 def __init__(self, version = None, body = None, headers = None):
00135 self.version = version
00136 self.body = body
00137 self.headers = headers
00138 self.normalize_headers()
00139
00140 def normalize_headers(self, headers = None):
00141 """
00142 Clean up (some of) the horrible messes that HTTP allows in its
00143 headers.
00144 """
00145 if headers is None:
00146 headers = () if self.headers is None else self.headers.items()
00147 translate_underscore = True
00148 else:
00149 translate_underscore = False
00150 result = {}
00151 for k, v in headers:
00152 if translate_underscore:
00153 k = k.replace("_", "-")
00154 k = "-".join(s.capitalize() for s in k.split("-"))
00155 v = v.strip()
00156 if k in result:
00157 result[k] += ", " + v
00158 else:
00159 result[k] = v
00160 self.headers = result
00161
00162 @classmethod
00163 def parse_from_wire(cls, headers):
00164 """
00165 Parse and normalize an incoming HTTP message.
00166 """
00167 self = cls()
00168 headers = headers.split("\r\n")
00169 self.parse_first_line(*headers.pop(0).split(None, 2))
00170 for i in xrange(len(headers) - 2, -1, -1):
00171 if headers[i + 1][0].isspace():
00172 headers[i] += headers[i + 1]
00173 del headers[i + 1]
00174 self.normalize_headers([h.split(":", 1) for h in headers])
00175 return self
00176
00177 def format(self):
00178 """
00179 Format an outgoing HTTP message.
00180 """
00181 s = self.format_first_line()
00182 if self.body is not None:
00183 assert isinstance(self.body, str)
00184 self.headers["Content-Length"] = len(self.body)
00185 for kv in self.headers.iteritems():
00186 s += "%s: %s\r\n" % kv
00187 s += "\r\n"
00188 if self.body is not None:
00189 s += self.body
00190 return s
00191
00192 def __str__(self):
00193 return self.format()
00194
00195 def parse_version(self, version):
00196 """
00197 Parse HTTP version, raise an exception if we can't.
00198 """
00199 if version[:5] != "HTTP/":
00200 raise rpki.exceptions.HTTPSBadVersion, "Couldn't parse version %s" % version
00201 self.version = tuple(int(i) for i in version[5:].split("."))
00202
00203 def persistent(self):
00204 """
00205 Figure out whether this HTTP message encourages a persistent connection.
00206 """
00207 c = self.headers.get("Connection")
00208 if self.version == (1, 1):
00209 return c is None or "close" not in c.lower()
00210 elif self.version == (1, 0):
00211 return c is not None and "keep-alive" in c.lower()
00212 else:
00213 return False
00214
00215 class http_request(http_message):
00216 """
00217 HTTP request message.
00218 """
00219
00220 def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
00221 assert cmd == "POST" or body is None
00222 http_message.__init__(self, version = version, body = body, headers = headers)
00223 self.cmd = cmd
00224 self.path = path
00225 self.callback = callback
00226 self.errback = errback
00227 self.retried = False
00228
00229 def parse_first_line(self, cmd, path, version):
00230 """
00231 Parse first line of HTTP request message.
00232 """
00233 self.parse_version(version)
00234 self.cmd = cmd
00235 self.path = path
00236
00237 def format_first_line(self):
00238 """
00239 Format first line of HTTP request message, and set up the
00240 User-Agent header.
00241 """
00242 self.headers.setdefault("User-Agent", self.software_name)
00243 return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
00244
00245 class http_response(http_message):
00246 """
00247 HTTP response message.
00248 """
00249
00250 def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
00251 http_message.__init__(self, version = version, body = body, headers = headers)
00252 self.code = code
00253 self.reason = reason
00254
00255 def parse_first_line(self, version, code, reason):
00256 """
00257 Parse first line of HTTP response message.
00258 """
00259 self.parse_version(version)
00260 self.code = int(code)
00261 self.reason = reason
00262
00263 def format_first_line(self):
00264 """
00265 Format first line of HTTP response message, and set up Date and
00266 Server headers.
00267 """
00268 self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
00269 self.headers.setdefault("Server", self.software_name)
00270 return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
00271
00272 def log_method(self, msg, logger = rpki.log.debug):
00273 """
00274 Logging method used in several different classes.
00275 """
00276 assert isinstance(logger, rpki.log.logger)
00277 if debug_http or logger is not rpki.log.debug:
00278 logger("%r: %s" % (self, msg))
00279
00280 class http_stream(asynchat.async_chat):
00281 """
00282 Virtual class representing an HTTP message stream.
00283 """
00284
00285 log = log_method
00286 tls = None
00287 retry_read = None
00288 retry_write = None
00289
00290 def __init__(self, sock = None):
00291 asynchat.async_chat.__init__(self, sock)
00292 self.buffer = []
00293 self.timer = rpki.async.timer(self.handle_timeout)
00294 self.restart()
00295
00296 def restart(self):
00297 """
00298 (Re)start HTTP message parser, reset timer.
00299 """
00300 assert not self.buffer
00301 self.chunk_handler = None
00302 self.set_terminator("\r\n\r\n")
00303 self.update_timeout()
00304
00305 def update_timeout(self):
00306 """
00307 Put this stream's timer in known good state: set it to the
00308 stream's timeout value if we're doing timeouts, otherwise clear
00309 it.
00310 """
00311 if self.timeout is not None:
00312 self.log("Setting timeout %r" % self.timeout)
00313 self.timer.set(self.timeout)
00314 else:
00315 self.log("Clearing timeout")
00316 self.timer.cancel()
00317
00318 def collect_incoming_data(self, data):
00319 """
00320 Buffer incoming data from asynchat.
00321 """
00322 self.buffer.append(data)
00323 self.update_timeout()
00324
00325 def get_buffer(self):
00326 """
00327 Consume data buffered from asynchat.
00328 """
00329 val = "".join(self.buffer)
00330 self.buffer = []
00331 return val
00332
00333 def found_terminator(self):
00334 """
00335 Asynchat reported that it found whatever terminator we set, so
00336 figure out what to do next. This can be messy, because we can be
00337 in any of several different states:
00338
00339 @li We might be handling chunked HTTP, in which case we have to
00340 initialize the chunk decoder;
00341
00342 @li We might have found the end of the message body, in which case
00343 we can (finally) process it; or
00344
00345 @li We might have just gotten to the end of the message headers,
00346 in which case we have to parse them to figure out which of three
00347 separate mechanisms (chunked, content-length, TCP close) is going
00348 to tell us how to find the end of the message body.
00349 """
00350 self.update_timeout()
00351 if self.chunk_handler:
00352 self.chunk_handler()
00353 elif not isinstance(self.get_terminator(), str):
00354 self.handle_body()
00355 else:
00356 self.msg = self.parse_type.parse_from_wire(self.get_buffer())
00357 if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
00358 self.msg.body = []
00359 self.chunk_handler = self.chunk_header
00360 self.set_terminator("\r\n")
00361 elif "Content-Length" in self.msg.headers:
00362 self.set_terminator(int(self.msg.headers["Content-Length"]))
00363 else:
00364 self.handle_no_content_length()
00365
00366 def chunk_header(self):
00367 """
00368 Asynchat just handed us what should be the header of one chunk of
00369 a chunked encoding stream. If this chunk has a body, set the
00370 stream up to read it; otherwise, this is the last chunk, so start
00371 the process of exiting the chunk decoder.
00372 """
00373 n = int(self.get_buffer().partition(";")[0], 16)
00374 self.log("Chunk length %s" % n)
00375 if n:
00376 self.chunk_handler = self.chunk_body
00377 self.set_terminator(n)
00378 else:
00379 self.msg.body = "".join(self.msg.body)
00380 self.chunk_handler = self.chunk_discard_trailer
00381
00382 def chunk_body(self):
00383 """
00384 Asynchat just handed us what should be the body of a chunk of the
00385 body of a chunked message (sic). Save it, and prepare to move on
00386 to the next chunk.
00387 """
00388 self.log("Chunk body")
00389 self.msg.body += self.buffer
00390 self.buffer = []
00391 self.chunk_handler = self.chunk_discard_crlf
00392 self.set_terminator("\r\n")
00393
00394 def chunk_discard_crlf(self):
00395 """
00396 Consume the CRLF that terminates a chunk, reinitialize chunk
00397 decoder to be ready for the next chunk.
00398 """
00399 self.log("Chunk CRLF")
00400 s = self.get_buffer()
00401 assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
00402 self.chunk_handler = self.chunk_header
00403
00404 def chunk_discard_trailer(self):
00405 """
00406 Consume chunk trailer, which should be empty, then (finally!) exit
00407 the chunk decoder and hand complete message off to the application.
00408 """
00409 self.log("Chunk trailer")
00410 s = self.get_buffer()
00411 assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
00412 self.chunk_handler = None
00413 self.handle_message()
00414
00415 def handle_body(self):
00416 """
00417 Hand normal (not chunked) message off to the application.
00418 """
00419 self.msg.body = self.get_buffer()
00420 self.handle_message()
00421
00422 def handle_error(self):
00423 """
00424 Asynchat (or asyncore, or somebody) raised an exception. See
00425 whether it's one we should just pass along, otherwise log a stack
00426 trace and close the stream.
00427 """
00428 etype = sys.exc_info()[0]
00429 if etype in (SystemExit, rpki.async.ExitNow):
00430 self.log("Caught %s, propagating" % etype.__name__)
00431 raise
00432 self.log("Error in HTTP stream handler", rpki.log.warn)
00433 rpki.log.traceback()
00434 if etype not in (rpki.exceptions.HTTPSClientAborted,):
00435 self.log("Closing due to error", rpki.log.warn)
00436 self.close(force = True)
00437
00438 def handle_timeout(self):
00439 """
00440 Inactivity timer expired, close connection with prejudice.
00441 """
00442 self.log("Timeout, closing")
00443 self.close(force = True)
00444
00445 def handle_close(self):
00446 """
00447 Wrapper around asynchat connection close handler, so that we can
00448 log the event.
00449 """
00450 self.log("Close event in HTTP stream handler")
00451 asynchat.async_chat.handle_close(self)
00452
00453 def send(self, data):
00454 """
00455 TLS replacement for normal asyncore .send() method. Throw an
00456 exception if TLS hasn't been started or if TLS I/O was already in
00457 progress, otherwise hand off to the TLS code.
00458 """
00459 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00460 assert self.tls is not None
00461 return self.tls.write(data)
00462
00463 def recv(self, buffer_size):
00464 """
00465 TLS replacement for normal asyncore .recv() method. Throw an
00466 exception if TLS hasn't been started or if TLS I/O was already in
00467 progress, otherwise hand off to the TLS code.
00468 """
00469 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00470 assert self.tls is not None
00471 return self.tls.read(buffer_size)
00472
00473 def readable(self):
00474 """
00475 TLS replacement for normal asynchat .readable() method. A TLS
00476 connection that's blocked waiting for TLS write is considered not
00477 readable even if the underlying socket is.
00478 """
00479 return self.retry_read is not None or (self.retry_write is None and asynchat.async_chat.readable(self))
00480
00481 def writeable(self):
00482 """
00483 TLS replacement for normal asynchat .writeable() method. A TLS
00484 connection that's blocked waiting for TLS read is considered not
00485 writeable even if the underlying socket is.
00486 """
00487 return self.retry_write is not None or (self.retry_read is None and asynchat.async_chat.writeable(self))
00488
00489 def handle_read(self):
00490 """
00491 Asyncore says socket is readable. Make sure there's no TLS write
00492 already in progress, retry previous read operation if we had one
00493 that was waiting for more input, otherwise try to read some data,
00494 and handle all the weird OpenSSL exceptions that the TLS code
00495 throws.
00496 """
00497 assert self.retry_write is None, "%r: TLS I/O already in progress, w %r" % (self, self.retry_write)
00498 if self.retry_read is not None:
00499 thunk = self.retry_read
00500 self.retry_read = None
00501 self.log("Retrying TLS read %r" % thunk)
00502 thunk()
00503 else:
00504 try:
00505 asynchat.async_chat.handle_read(self)
00506 except POW.WantReadError:
00507 self.retry_read = self.handle_read
00508 except POW.WantWriteError:
00509 self.retry_write = self.handle_read
00510 except POW.ZeroReturnError:
00511 self.log("ZeroReturn in handle_read()")
00512 self.handle_close()
00513 except POW.SSLUnexpectedEOFError:
00514 self.log("SSLUnexpectedEOF in handle_read()", rpki.log.warn)
00515 self.handle_error()
00516
00517 def handle_write(self):
00518 """
00519 Asyncore says socket is writeable. Make sure there's no TLS read
00520 already in progress, retry previous write operation if we had one
00521 that was blocked on the socket, otherwise try to write some data.
00522 Handling all the weird OpenSSL exceptions that TLS throws is our
00523 caller's problem.
00524 """
00525
00526
00527
00528
00529
00530 if self.retry_read is not None:
00531 self.log("TLS I/O already in progress, r %r" % self.retry_read)
00532 return
00533 if self.retry_write is not None:
00534 thunk = self.retry_write
00535 self.retry_write = None
00536 thunk()
00537 self.log("Retrying TLS write %r" % thunk)
00538 else:
00539 asynchat.async_chat.handle_write(self)
00540
00541 def initiate_send(self):
00542 """
00543 Initiate a write operation. This is just a wrapper around the
00544 asynchat method, to handle all the whacky TLS exceptions.
00545 """
00546 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00547 try:
00548 asynchat.async_chat.initiate_send(self)
00549 except POW.WantReadError:
00550 self.retry_read = self.initiate_send
00551 except POW.WantWriteError:
00552 self.retry_write = self.initiate_send
00553 except POW.ZeroReturnError:
00554 self.log("ZeroReturn in initiate_send()")
00555 self.handle_close()
00556 except POW.SSLUnexpectedEOFError:
00557 self.log("SSLUnexpectedEOF in initiate_send()", rpki.log.warn)
00558 self.handle_error()
00559
00560 def close(self, force = False):
00561 """
00562 Close the stream.
00563
00564 Graceful shutdown of a TLS connection requires multiple calls to
00565 the underlying TLS code. If the connection should be closed right
00566 now without waiting (perhaps because it's already dead and we're
00567 just cleaning up), call with force = True.
00568 """
00569 self.log("Close requested")
00570 assert force or (self.retry_read is None and self.retry_write is None), "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00571 if self.tls is not None:
00572 try:
00573 if self.retry_read is None and self.retry_write is None:
00574 ret = self.tls.shutdown()
00575 else:
00576 ret = None
00577 self.log("tls.shutdown() returned %s, force_shutdown %s" % (ret, force))
00578 if ret or force:
00579 self.tls = None
00580 except POW.WantReadError:
00581 self.retry_read = self.close
00582 except POW.WantWriteError:
00583 self.retry_write = self.close
00584 except POW.SSLError, e:
00585 self.log("tls.shutdown() threw %s, shutting down anyway" % e)
00586 self.tls = None
00587 if self.tls is None:
00588 self.log("TLS layer is done, closing socket")
00589 self.timer.cancel()
00590 self.timer.set_handler(None)
00591 try:
00592 asynchat.async_chat.close(self)
00593 except AttributeError:
00594 if getattr(self, "socket", None) is not None:
00595 raise
00596
00597 def log_cert(self, tag, x):
00598 """
00599 Log HTTPS certificates, if certificate debugging is enabled.
00600 """
00601 if debug_tls_certs:
00602 rpki.log.debug("%r: HTTPS %s cert %r issuer %s [%s] subject %s [%s]" % (self, tag, x, x.getIssuer(), x.hAKI(), x.getSubject(), x.hSKI()))
00603
00604 class http_server(http_stream):
00605 """
00606 HTTP(S) server stream.
00607 """
00608
00609
00610
00611 parse_type = http_request
00612
00613
00614
00615 timeout = default_server_timeout
00616
00617 def __init__(self, sock, handlers, cert = None, key = None, ta = (), dynamic_ta = None):
00618 self.log("Starting")
00619 self.handlers = handlers
00620 http_stream.__init__(self, sock = sock)
00621 self.expect_close = not want_persistent_server
00622
00623 self.log("cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta))
00624
00625 self.tls = POW.Ssl(POW.TLSV1_SERVER_METHOD)
00626 self.log_cert("server", cert)
00627 self.tls.useCertificate(cert.get_POW())
00628 self.tls.useKey(key.get_POW())
00629 ta = rpki.x509.X509.normalize_chain(dynamic_ta() if dynamic_ta else ta)
00630 assert ta
00631 for x in ta:
00632 self.log_cert("trusted", x)
00633 self.tls.addTrust(x.get_POW())
00634 self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT)
00635
00636 self.tls.setFd(self.fileno())
00637 self.tls_accept()
00638
00639 def tls_accept(self):
00640 """
00641 Set up TLS for server side connection, handling all the whacky
00642 OpenSSL exceptions from TLS.
00643
00644 SSLErrorSSLError exceptions are particularly nasty, because all
00645 too often they indicate a certificate lookup failure deep within
00646 the guts of OpenSSL's TLS connection setup logic. Extracting
00647 anything resembling a Python data structure from a handler called
00648 that deep inside the OpenSSL TLS library, while theoretically
00649 possible, runs a high risk of triggering some kind of memory leak
00650 or corruption. So, for now, we just get back a long text string,
00651 which we break up and log but don't attempt to process further.
00652 """
00653 try:
00654 self.tls.accept()
00655 except POW.WantReadError:
00656 self.retry_read = self.tls_accept
00657 except POW.WantWriteError:
00658 self.retry_write = self.tls_accept
00659 except POW.SSLUnexpectedEOFError:
00660 self.close(force = True)
00661 except POW.SSLErrorSSLError, e:
00662 if "\n" in e:
00663 for line in str(e).splitlines():
00664 rpki.log.error(line)
00665 raise POW.SSLErrorSSLError, "TLS certificate problem, most likely"
00666 else:
00667 raise
00668
00669 def handle_no_content_length(self):
00670 """
00671 Handle an incoming message that used neither chunking nor a
00672 Content-Length header (that is: this message will be the last one
00673 in this server stream). No special action required.
00674 """
00675 self.handle_message()
00676
00677 def find_handler(self, path):
00678 """
00679 Helper method to search self.handlers.
00680 """
00681 for s, h in self.handlers:
00682 if path.startswith(s):
00683 return h
00684 return None
00685
00686 def handle_message(self):
00687 """
00688 TLS and HTTP layers managed to deliver a complete HTTP request to
00689 us, figure out what to do with it. Check the command and
00690 Content-Type, look for a handler, and if everything looks right,
00691 pass the message body, path, and a reply callback to the handler.
00692 """
00693 self.log("Received request %s %s" % (self.msg.cmd, self.msg.path))
00694 if not self.msg.persistent():
00695 self.expect_close = True
00696 handler = self.find_handler(self.msg.path)
00697 error = None
00698 if self.msg.cmd != "POST":
00699 error = 501, "No handler for method %s" % self.msg.cmd
00700 elif self.msg.headers["Content-Type"] != rpki_content_type:
00701 error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
00702 elif handler is None:
00703 error = 404, "No handler for URL %s" % self.msg.path
00704 if error is None:
00705 try:
00706 handler(self.msg.body, self.msg.path, self.send_reply)
00707 except (rpki.async.ExitNow, SystemExit):
00708 raise
00709 except Exception, e:
00710 rpki.log.traceback()
00711 self.send_error(500, "Unhandled exception %s" % e)
00712 else:
00713 self.send_error(code = error[0], reason = error[1])
00714
00715 def send_error(self, code, reason):
00716 """
00717 Send an error response to this request.
00718 """
00719 self.send_message(code = code, reason = reason)
00720
00721 def send_reply(self, code, body):
00722 """
00723 Send a reply to this request.
00724 """
00725 self.send_message(code = code, body = body)
00726
00727 def send_message(self, code, reason = "OK", body = None):
00728 """
00729 Queue up reply message. If both parties agree that connection is
00730 persistant, and if no error occurred, restart this stream to
00731 listen for next message; otherwise, queue up a close event for
00732 this stream so it will shut down once the reply has been sent.
00733 """
00734 self.log("Sending response %s %s" % (code, reason))
00735 if code >= 400:
00736 self.expect_close = True
00737 msg = http_response(code = code, reason = reason, body = body,
00738 Content_Type = rpki_content_type,
00739 Connection = "Close" if self.expect_close else "Keep-Alive")
00740 self.push(msg.format())
00741 if self.expect_close:
00742 self.log("Closing")
00743 self.timer.cancel()
00744 self.close_when_done()
00745 else:
00746 self.log("Listening for next message")
00747 self.restart()
00748
00749 class http_listener(asyncore.dispatcher):
00750 """
00751 Listener for incoming HTTP(S) connections.
00752 """
00753
00754 log = log_method
00755
00756 def __init__(self, handlers, addrinfo, cert = None, key = None, ta = None, dynamic_ta = None):
00757 self.log("Listener cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta))
00758 asyncore.dispatcher.__init__(self)
00759 self.handlers = handlers
00760 self.cert = cert
00761 self.key = key
00762 self.ta = ta
00763 self.dynamic_ta = dynamic_ta
00764 try:
00765 af, socktype, proto, canonname, sockaddr = addrinfo
00766 self.create_socket(af, socktype)
00767 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00768 if hasattr(socket, "SO_REUSEPORT"):
00769 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00770 self.bind(sockaddr)
00771 self.listen(5)
00772 except (rpki.async.ExitNow, SystemExit):
00773 raise
00774 except:
00775 self.handle_error()
00776 self.log("Listening on %r, handlers %r" % (sockaddr, handlers))
00777
00778 def handle_accept(self):
00779 """
00780 Asyncore says we have an incoming connection, spawn an http_server
00781 stream for it and pass along all of our handler and TLS data.
00782 """
00783 self.log("Accepting connection")
00784 try:
00785 s, client = self.accept()
00786 self.log("Accepting connection from %r" % (client,))
00787 http_server(sock = s, handlers = self.handlers, cert = self.cert, key = self.key, ta = self.ta, dynamic_ta = self.dynamic_ta)
00788 except (rpki.async.ExitNow, SystemExit):
00789 raise
00790 except:
00791 self.handle_error()
00792
00793 def handle_error(self):
00794 """
00795 Asyncore signaled an error, pass it along or log it.
00796 """
00797 if sys.exc_info()[0] is SystemExit:
00798 self.log("Caught SystemExit, propagating")
00799 raise
00800 else:
00801 self.log("Error in HTTP listener", rpki.log.warn)
00802 rpki.log.traceback()
00803
00804 class http_client(http_stream):
00805 """
00806 HTTP(S) client stream.
00807 """
00808
00809
00810
00811 parse_type = http_response
00812
00813
00814
00815 timeout = default_client_timeout
00816
00817 def __init__(self, queue, hostport, cert = None, key = None, ta = ()):
00818 self.log("Creating new connection to %r" % (hostport,))
00819 self.log("cert %r key %r ta %r" % (cert, key, ta))
00820 http_stream.__init__(self)
00821 self.queue = queue
00822 self.host = hostport[0]
00823 self.port = hostport[1]
00824 self.state = "opening"
00825 self.expect_close = not want_persistent_client
00826 self.cert = cert
00827 self.key = key
00828 self.ta = rpki.x509.X509.normalize_chain(ta)
00829
00830 def start(self):
00831 """
00832 Create socket and request a connection.
00833 """
00834 if not use_adns:
00835 self.gotaddrinfo([(socket.AF_INET, self.host)])
00836 elif self.host == "localhost":
00837 self.gotaddrinfo(localhost_addrinfo())
00838 else:
00839 import rpki.adns
00840 rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, supported_address_families(enable_ipv6_clients))
00841
00842 def dns_error(self, e):
00843 """
00844 Handle DNS lookup errors. For now, just whack the connection.
00845 Undoubtedly we should do something better with diagnostics here.
00846 """
00847 self.handle_error()
00848
00849 def gotaddrinfo(self, addrinfo):
00850 """
00851 Got address data from DNS, create socket and request connection.
00852 """
00853 try:
00854 self.af, self.addr = random.choice(addrinfo)
00855 self.create_socket(self.af, socket.SOCK_STREAM)
00856 self.connect((self.addr, self.port))
00857 except (rpki.async.ExitNow, SystemExit):
00858 raise
00859 except:
00860 self.handle_error()
00861
00862 def handle_connect(self):
00863 """
00864 Asyncore says socket has connected, configure TLS junk.
00865 """
00866 self.log("Socket connected")
00867 self.tls = POW.Ssl(POW.TLSV1_CLIENT_METHOD)
00868 self.log_cert("client", self.cert)
00869 self.tls.useCertificate(self.cert.get_POW())
00870 self.tls.useKey(self.key.get_POW())
00871 assert self.ta
00872 for x in self.ta:
00873 self.log_cert("trusted", x)
00874 self.tls.addTrust(x.get_POW())
00875 self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT)
00876 self.tls.setFd(self.fileno())
00877 self.tls_connect()
00878
00879 def tls_connect(self):
00880 """
00881 Initialize client side of TLS.
00882 """
00883 try:
00884 self.tls.connect()
00885 except POW.WantReadError:
00886 self.retry_read = self.tls_connect
00887 except POW.WantWriteError:
00888 self.retry_write = self.tls_connect
00889 else:
00890 self.log("TLS connected")
00891 self.set_state("idle")
00892 self.queue.send_request()
00893
00894 def set_state(self, state):
00895 """
00896 Set HTTP client connection state.
00897 """
00898 self.log("State transition %s => %s" % (self.state, state))
00899 self.state = state
00900
00901 def handle_no_content_length(self):
00902 """
00903 Handle response message that used neither chunking nor a
00904 Content-Length header (that is: this message will be the last one
00905 in this server stream). In this case we want to read until we
00906 reach the end of the data stream.
00907 """
00908 self.set_terminator(None)
00909
00910 def send_request(self, msg):
00911 """
00912 Queue up request message and kickstart connection.
00913 """
00914 self.log("Sending request %r" % msg)
00915 assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
00916 self.set_state("request-sent")
00917 msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
00918 self.push(msg.format())
00919 self.restart()
00920
00921 def handle_message(self):
00922 """
00923 Handle incoming HTTP response message. Make sure we're in a state
00924 where we expect to see such a message (and allow the mysterious
00925 empty messages that Apache sends during connection close, no idea
00926 what that is supposed to be about). If everybody agrees that the
00927 connection should stay open, put it into an idle state; otherwise,
00928 arrange for the stream to shut down.
00929 """
00930
00931 self.log("Message received, state %s" % self.state)
00932
00933 if not self.msg.persistent():
00934 self.expect_close = True
00935
00936 if self.state != "request-sent":
00937 if self.state == "closing":
00938 assert not self.msg.body
00939 self.log("Ignoring empty response received while closing")
00940 return
00941 raise rpki.exceptions.HTTPSUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state)
00942
00943 if self.expect_close:
00944 self.log("Closing")
00945 self.set_state("closing")
00946 self.queue.detach(self)
00947 self.close_when_done()
00948 else:
00949 self.log("Idling")
00950 self.set_state("idle")
00951 self.update_timeout()
00952
00953 if self.msg.code != 200:
00954 raise rpki.exceptions.HTTPRequestFailed, "HTTPS request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)
00955 self.queue.return_result(self.msg)
00956
00957 def handle_close(self):
00958 """
00959 Asyncore signaled connection close. If we were waiting for that
00960 to find the end of a response message, process the resulting
00961 message now; if we were waiting for the response to a request we
00962 sent, signal the error.
00963 """
00964 http_stream.handle_close(self)
00965 self.log("State %s" % self.state)
00966 self.queue.detach(self)
00967 if self.get_terminator() is None:
00968 self.handle_body()
00969 elif self.state == "request-sent":
00970 raise rpki.exceptions.HTTPSClientAborted, "HTTPS request aborted by close event"
00971
00972 def handle_timeout(self):
00973 """
00974 Connection idle timer has expired. Shut down connection in any
00975 case, noisily if we weren't idle.
00976 """
00977 if self.state != "idle":
00978 self.log("Timeout while in state %s" % self.state, rpki.log.warn)
00979 http_stream.handle_timeout(self)
00980 self.queue.detach(self)
00981 if self.state != "idle":
00982 try:
00983 raise rpki.exceptions.HTTPTimeout
00984 except rpki.exceptions.HTTPTimeout, e:
00985 self.queue.return_result(e)
00986
00987 def handle_error(self):
00988 """
00989 Asyncore says something threw an exception. Log it, then shut
00990 down the connection and pass back the exception.
00991 """
00992 eclass, edata = sys.exc_info()[0:2]
00993 self.log("Error on HTTP client connection %s:%s: %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn)
00994 http_stream.handle_error(self)
00995 self.queue.detach(self)
00996 self.queue.return_result(edata)
00997
00998 class http_queue(object):
00999 """
01000 Queue of pending HTTP requests for a single destination. This class
01001 is very tightly coupled to http_client; http_client handles the HTTP
01002 stream itself, this class provides a slightly higher-level API.
01003 """
01004
01005 log = log_method
01006
01007 def __init__(self, hostport, cert = None, key = None, ta = ()):
01008 self.log("Creating queue for %r" % (hostport,))
01009 self.log("cert %r key %r ta %r" % (cert, key, ta))
01010 self.hostport = hostport
01011 self.client = None
01012 self.queue = []
01013 self.cert = cert
01014 self.key = key
01015 self.ta = ta
01016
01017 def request(self, *requests):
01018 """
01019 Append http_request object(s) to this queue.
01020 """
01021 self.log("Adding requests %r" % requests)
01022 self.queue.extend(requests)
01023
01024 def restart(self):
01025 """
01026 Send next request for this queue, if we can. This may involve
01027 starting a new http_client stream, reusing an existing idle
01028 stream, or just ignoring this request if there's an active client
01029 stream already; in the last case, handling of the response (or
01030 exception, or timeout) for the query currently in progress will
01031 call this method when it's time to kick out the next query.
01032 """
01033 try:
01034 if self.client is None:
01035 self.client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta)
01036 self.log("Attached client %r" % self.client)
01037 self.client.start()
01038 elif self.client.state == "idle":
01039 self.log("Sending request to existing client %r" % self.client)
01040 self.send_request()
01041 else:
01042 self.log("Client %r exists in state %r" % (self.client, self.client.state))
01043 except (rpki.async.ExitNow, SystemExit):
01044 raise
01045 except Exception, e:
01046 self.return_result(e)
01047
01048 def send_request(self):
01049 """
01050 Kick out the next query in this queue, if any.
01051 """
01052 if self.queue:
01053 self.client.send_request(self.queue[0])
01054
01055 def detach(self, client_):
01056 """
01057 Detatch a client from this queue. Silently ignores attempting to
01058 detach a client that is not attached to this queue, to simplify
01059 handling of what otherwise would be a nasty set of race
01060 conditions.
01061 """
01062 if client_ is self.client:
01063 self.log("Detaching client %r" % client_)
01064 self.client = None
01065
01066 def return_result(self, result):
01067 """
01068 Client stream has returned a result, which we need to pass along
01069 to the original caller. Result may be either an HTTP response
01070 message or an exception. In either case, once we're done
01071 processing this result, kick off next message in the queue, if any.
01072 """
01073
01074 if not self.queue:
01075 self.log("No caller, this should not happen. Dropping result %r" % result)
01076
01077 req = self.queue.pop(0)
01078 self.log("Dequeuing request %r" % req)
01079
01080 try:
01081 if isinstance(result, http_response):
01082 self.log("Returning result %r to caller" % result)
01083 req.callback(result.body)
01084 else:
01085 assert isinstance(result, Exception)
01086 self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn)
01087 req.errback(result)
01088 except (rpki.async.ExitNow, SystemExit):
01089 raise
01090 except:
01091 self.log("Unhandled exception from callback")
01092 rpki.log.traceback()
01093
01094 self.log("Queue: %r" % self.queue)
01095
01096 if self.queue:
01097 self.restart()
01098
01099
01100
01101 client_queues = {}
01102
01103 def client(msg, client_key, client_cert, server_ta, url, callback, errback):
01104 """
01105 Open client HTTPS connection, send a message, set up callbacks to
01106 handle response.
01107 """
01108
01109 u = urlparse.urlparse(url)
01110
01111 if (u.scheme not in ("", "https") or
01112 u.username is not None or
01113 u.password is not None or
01114 u.params != "" or
01115 u.query != "" or
01116 u.fragment != ""):
01117 raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url
01118
01119 if debug_http:
01120 rpki.log.debug("Contacting %s" % url)
01121
01122 request = http_request(
01123 cmd = "POST",
01124 path = u.path,
01125 body = msg,
01126 callback = callback,
01127 errback = errback,
01128 Host = u.hostname,
01129 Content_Type = rpki_content_type)
01130
01131 hostport = (u.hostname or "localhost", u.port or default_tcp_port)
01132
01133 if debug_http:
01134 rpki.log.debug("Created request %r for %r" % (request, hostport))
01135 if not isinstance(server_ta, (tuple, list)):
01136 server_ta = (server_ta,)
01137 if hostport not in client_queues:
01138 client_queues[hostport] = http_queue(hostport, cert = client_cert, key = client_key, ta = server_ta)
01139 client_queues[hostport].request(request)
01140
01141
01142
01143
01144 if debug_http:
01145 rpki.log.debug("Scheduling connection startup for %r" % request)
01146 rpki.async.defer(client_queues[hostport].restart)
01147
01148 def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dynamic_https_trust_anchor = None):
01149 """
01150 Run an HTTPS server and wait (forever) for connections.
01151 """
01152
01153 if not isinstance(handlers, (tuple, list)):
01154 handlers = (("/", handlers),)
01155
01156 if not isinstance(client_ta, (tuple, list)):
01157 client_ta = (client_ta,)
01158
01159 for af in supported_address_families(enable_ipv6_servers):
01160 try:
01161 for addrinfo in socket.getaddrinfo(host if host else "::" if have_ipv6 and af == socket.AF_INET6 else "0.0.0.0",
01162 port, af, socket.SOCK_STREAM):
01163 http_listener(addrinfo = addrinfo, handlers = handlers, cert = server_cert, key = server_key, ta = client_ta, dynamic_ta = dynamic_https_trust_anchor)
01164 except socket.gaierror, e:
01165 rpki.log.info("getaddrinfo() error for AF %d, host %s, port %s, skipping address family: %s" % (af, host, port, e))
01166 rpki.async.event_loop()
01167
01168 def build_https_ta_cache(certs):
01169 """
01170 Package up a collection of certificates into a form suitable for use
01171 as a dynamic HTTPS trust anchor set. Precise format of this
01172 collection is an internal conspiracy within the rpki.https module;
01173 at one point it was a POW.X509Store object, at the moment it's a
01174 Python set, what it will be tomorow is nobody else's business.
01175 """
01176
01177 return set(certs)
01178
01179 class caller(object):
01180 """
01181 Handle client-side mechanics for protocols based on HTTPS, CMS, and
01182 rpki.xml_utils. Calling sequence is intended to nest within
01183 rpki.async.sync_wrapper.
01184 """
01185
01186 debug = False
01187
01188 def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
01189 self.proto = proto
01190 self.client_key = client_key
01191 self.client_cert = client_cert
01192 self.server_ta = server_ta
01193 self.server_cert = server_cert
01194 self.url = url
01195 if debug is not None:
01196 self.debug = debug
01197
01198 def __call__(self, cb, eb, *pdus):
01199
01200 def done(cms):
01201 """
01202 Handle CMS-wrapped XML response message.
01203 """
01204 result = self.proto.cms_msg.unwrap(cms, (self.server_ta, self.server_cert), pretty_print = self.debug)
01205 if self.debug:
01206 msg, xml = result
01207 print "<!-- Reply -->"
01208 print xml
01209 else:
01210 msg = result
01211 cb(msg)
01212
01213 msg = self.proto.msg.query(*pdus)
01214 result = self.proto.cms_msg.wrap(msg, self.client_key, self.client_cert, pretty_print = self.debug)
01215 if self.debug:
01216 cms, xml = result
01217 print "<!-- Query -->"
01218 print xml
01219 else:
01220 cms = result
01221
01222 client(
01223 client_key = self.client_key,
01224 client_cert = self.client_cert,
01225 server_ta = self.server_ta,
01226 url = self.url,
01227 msg = cms,
01228 callback = done,
01229 errback = eb)