00001 """
00002 HTTPS utilities, both client and server.
00003
00004 $Id: https.py 3191 2010-04-12 23:07:16Z sra $
00005
00006 Copyright (C) 2009-2010 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019
00020 Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
00021
00022 Permission to use, copy, modify, and distribute this software for any
00023 purpose with or without fee is hereby granted, provided that the above
00024 copyright notice and this permission notice appear in all copies.
00025
00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00028 AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00032 PERFORMANCE OF THIS SOFTWARE.
00033 """
00034
00035 import time, socket, asyncore, asynchat, urlparse, sys
00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log
00037 import POW
00038
00039
00040
00041 rpki_content_type = "application/x-rpki"
00042
00043
00044
00045 debug_http = False
00046
00047
00048
00049 debug_tls_certs = False
00050
00051
00052
00053 want_persistent_client = False
00054
00055
00056
00057 want_persistent_server = False
00058
00059
00060
00061 default_client_timeout = rpki.sundial.timedelta(minutes = 15)
00062
00063
00064
00065
00066
00067
00068 default_server_timeout = rpki.sundial.timedelta(minutes = 20)
00069
00070
00071
00072 default_http_version = (1, 0)
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084 if False:
00085 supported_address_families = (socket.AF_INET, socket.AF_INET6)
00086 else:
00087 supported_address_families = (socket.AF_INET,)
00088
00089
00090 class http_message(object):
00091 """
00092 Virtual class representing of one HTTP message.
00093 """
00094
00095 software_name = "ISC RPKI library"
00096
00097 def __init__(self, version = None, body = None, headers = None):
00098 self.version = version
00099 self.body = body
00100 self.headers = headers
00101 self.normalize_headers()
00102
00103 def normalize_headers(self, headers = None):
00104 """
00105 Clean up (some of) the horrible messes that HTTP allows in its
00106 headers.
00107 """
00108 if headers is None:
00109 headers = () if self.headers is None else self.headers.items()
00110 translate_underscore = True
00111 else:
00112 translate_underscore = False
00113 result = {}
00114 for k, v in headers:
00115 if translate_underscore:
00116 k = k.replace("_", "-")
00117 k = "-".join(s.capitalize() for s in k.split("-"))
00118 v = v.strip()
00119 if k in result:
00120 result[k] += ", " + v
00121 else:
00122 result[k] = v
00123 self.headers = result
00124
00125 @classmethod
00126 def parse_from_wire(cls, headers):
00127 """
00128 Parse and normalize an incoming HTTP message.
00129 """
00130 self = cls()
00131 headers = headers.split("\r\n")
00132 self.parse_first_line(*headers.pop(0).split(None, 2))
00133 for i in xrange(len(headers) - 2, -1, -1):
00134 if headers[i + 1][0].isspace():
00135 headers[i] += headers[i + 1]
00136 del headers[i + 1]
00137 self.normalize_headers([h.split(":", 1) for h in headers])
00138 return self
00139
00140 def format(self):
00141 """
00142 Format an outgoing HTTP message.
00143 """
00144 s = self.format_first_line()
00145 if self.body is not None:
00146 assert isinstance(self.body, str)
00147 self.headers["Content-Length"] = len(self.body)
00148 for kv in self.headers.iteritems():
00149 s += "%s: %s\r\n" % kv
00150 s += "\r\n"
00151 if self.body is not None:
00152 s += self.body
00153 return s
00154
00155 def __str__(self):
00156 return self.format()
00157
00158 def parse_version(self, version):
00159 """
00160 Parse HTTP version, raise an exception if we can't.
00161 """
00162 if version[:5] != "HTTP/":
00163 raise rpki.exceptions.HTTPSBadVersion, "Couldn't parse version %s" % version
00164 self.version = tuple(int(i) for i in version[5:].split("."))
00165
00166 def persistent(self):
00167 """
00168 Figure out whether this HTTP message encourages a persistent connection.
00169 """
00170 c = self.headers.get("Connection")
00171 if self.version == (1, 1):
00172 return c is None or "close" not in c.lower()
00173 elif self.version == (1, 0):
00174 return c is not None and "keep-alive" in c.lower()
00175 else:
00176 return False
00177
00178 class http_request(http_message):
00179 """
00180 HTTP request message.
00181 """
00182
00183 def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
00184 assert cmd == "POST" or body is None
00185 http_message.__init__(self, version = version, body = body, headers = headers)
00186 self.cmd = cmd
00187 self.path = path
00188 self.callback = callback
00189 self.errback = errback
00190 self.retried = False
00191
00192 def parse_first_line(self, cmd, path, version):
00193 """
00194 Parse first line of HTTP request message.
00195 """
00196 self.parse_version(version)
00197 self.cmd = cmd
00198 self.path = path
00199
00200 def format_first_line(self):
00201 """
00202 Format first line of HTTP request message, and set up the
00203 User-Agent header.
00204 """
00205 self.headers.setdefault("User-Agent", self.software_name)
00206 return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
00207
00208 class http_response(http_message):
00209 """
00210 HTTP response message.
00211 """
00212
00213 def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
00214 http_message.__init__(self, version = version, body = body, headers = headers)
00215 self.code = code
00216 self.reason = reason
00217
00218 def parse_first_line(self, version, code, reason):
00219 """
00220 Parse first line of HTTP response message.
00221 """
00222 self.parse_version(version)
00223 self.code = int(code)
00224 self.reason = reason
00225
00226 def format_first_line(self):
00227 """
00228 Format first line of HTTP response message, and set up Date and
00229 Server headers.
00230 """
00231 self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
00232 self.headers.setdefault("Server", self.software_name)
00233 return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
00234
00235 def log_method(self, msg, logger = rpki.log.debug):
00236 """
00237 Logging method used in several different classes.
00238 """
00239 assert isinstance(logger, rpki.log.logger)
00240 if debug_http or logger is not rpki.log.debug:
00241 logger("%r: %s" % (self, msg))
00242
00243 class http_stream(asynchat.async_chat):
00244 """
00245 Virtual class representing an HTTP message stream.
00246 """
00247
00248 log = log_method
00249 tls = None
00250 retry_read = None
00251 retry_write = None
00252
00253 def __init__(self, sock = None):
00254 asynchat.async_chat.__init__(self, sock)
00255 self.buffer = []
00256 self.timer = rpki.async.timer(self.handle_timeout)
00257 self.restart()
00258
00259 def restart(self):
00260 """
00261 (Re)start HTTP message parser, reset timer.
00262 """
00263 assert not self.buffer
00264 self.chunk_handler = None
00265 self.set_terminator("\r\n\r\n")
00266 self.update_timeout()
00267
00268 def update_timeout(self):
00269 """
00270 Put this stream's timer in known good state: set it to the
00271 stream's timeout value if we're doing timeouts, otherwise clear
00272 it.
00273 """
00274 if self.timeout is not None:
00275 self.timer.set(self.timeout)
00276 else:
00277 self.timer.cancel()
00278
00279 def collect_incoming_data(self, data):
00280 """
00281 Buffer incoming data from asynchat.
00282 """
00283 self.buffer.append(data)
00284 self.update_timeout()
00285
00286 def get_buffer(self):
00287 """
00288 Consume data buffered from asynchat.
00289 """
00290 val = "".join(self.buffer)
00291 self.buffer = []
00292 return val
00293
00294 def found_terminator(self):
00295 """
00296 Asynchat reported that it found whatever terminator we set, so
00297 figure out what to do next. This can be messy, because we can be
00298 in any of several different states:
00299
00300 @li We might be handling chunked HTTP, in which case we have to
00301 initialize the chunk decoder;
00302
00303 @li We might have found the end of the message body, in which case
00304 we can (finally) process it; or
00305
00306 @li We might have just gotten to the end of the message headers,
00307 in which case we have to parse them to figure out which of three
00308 separate mechanisms (chunked, content-length, TCP close) is going
00309 to tell us how to find the end of the message body.
00310 """
00311 self.update_timeout()
00312 if self.chunk_handler:
00313 self.chunk_handler()
00314 elif not isinstance(self.get_terminator(), str):
00315 self.handle_body()
00316 else:
00317 self.msg = self.parse_type.parse_from_wire(self.get_buffer())
00318 if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
00319 self.msg.body = []
00320 self.chunk_handler = self.chunk_header
00321 self.set_terminator("\r\n")
00322 elif "Content-Length" in self.msg.headers:
00323 self.set_terminator(int(self.msg.headers["Content-Length"]))
00324 else:
00325 self.handle_no_content_length()
00326
00327 def chunk_header(self):
00328 """
00329 Asynchat just handed us what should be the header of one chunk of
00330 a chunked encoding stream. If this chunk has a body, set the
00331 stream up to read it; otherwise, this is the last chunk, so start
00332 the process of exiting the chunk decoder.
00333 """
00334 n = int(self.get_buffer().partition(";")[0], 16)
00335 self.log("Chunk length %s" % n)
00336 if n:
00337 self.chunk_handler = self.chunk_body
00338 self.set_terminator(n)
00339 else:
00340 self.msg.body = "".join(self.msg.body)
00341 self.chunk_handler = self.chunk_discard_trailer
00342
00343 def chunk_body(self):
00344 """
00345 Asynchat just handed us what should be the body of a chunk of the
00346 body of a chunked message (sic). Save it, and prepare to move on
00347 to the next chunk.
00348 """
00349 self.log("Chunk body")
00350 self.msg.body += self.buffer
00351 self.buffer = []
00352 self.chunk_handler = self.chunk_discard_crlf
00353 self.set_terminator("\r\n")
00354
00355 def chunk_discard_crlf(self):
00356 """
00357 Consume the CRLF that terminates a chunk, reinitialize chunk
00358 decoder to be ready for the next chunk.
00359 """
00360 self.log("Chunk CRLF")
00361 s = self.get_buffer()
00362 assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
00363 self.chunk_handler = self.chunk_header
00364
00365 def chunk_discard_trailer(self):
00366 """
00367 Consume chunk trailer, which should be empty, then (finally!) exit
00368 the chunk decoder and hand complete message off to the application.
00369 """
00370 self.log("Chunk trailer")
00371 s = self.get_buffer()
00372 assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
00373 self.chunk_handler = None
00374 self.handle_message()
00375
00376 def handle_body(self):
00377 """
00378 Hand normal (not chunked) message off to the application.
00379 """
00380 self.msg.body = self.get_buffer()
00381 self.handle_message()
00382
00383 def handle_error(self):
00384 """
00385 Asynchat (or asyncore, or somebody) raised an exception. See
00386 whether it's one we should just pass along, otherwise log a stack
00387 trace and close the stream.
00388 """
00389 etype = sys.exc_info()[0]
00390 if etype in (SystemExit, rpki.async.ExitNow):
00391 self.log("Caught %s, propagating" % etype.__name__)
00392 raise
00393 self.log("Error in HTTP stream handler", rpki.log.warn)
00394 rpki.log.traceback()
00395 if etype not in (rpki.exceptions.HTTPSClientAborted,):
00396 self.log("Closing due to error", rpki.log.warn)
00397 self.close(force = True)
00398
00399 def handle_timeout(self):
00400 """
00401 Inactivity timer expired, close connection with prejudice.
00402 """
00403 self.log("Timeout, closing")
00404 self.close(force = True)
00405
00406 def handle_close(self):
00407 """
00408 Wrapper around asynchat connection close handler, so that we can
00409 log the event.
00410 """
00411 self.log("Close event in HTTP stream handler")
00412 asynchat.async_chat.handle_close(self)
00413
00414 def send(self, data):
00415 """
00416 TLS replacement for normal asyncore .send() method. Throw an
00417 exception if TLS hasn't been started or if TLS I/O was already in
00418 progress, otherwise hand off to the TLS code.
00419 """
00420 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00421 assert self.tls is not None
00422 return self.tls.write(data)
00423
00424 def recv(self, buffer_size):
00425 """
00426 TLS replacement for normal asyncore .recv() method. Throw an
00427 exception if TLS hasn't been started or if TLS I/O was already in
00428 progress, otherwise hand off to the TLS code.
00429 """
00430 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00431 assert self.tls is not None
00432 return self.tls.read(buffer_size)
00433
00434 def readable(self):
00435 """
00436 TLS replacement for normal asynchat .readable() method. A TLS
00437 connection that's blocked waiting for TLS write is considered not
00438 readable even if the underlying socket is.
00439 """
00440 return self.retry_read is not None or (self.retry_write is None and asynchat.async_chat.readable(self))
00441
00442 def writeable(self):
00443 """
00444 TLS replacement for normal asynchat .writeable() method. A TLS
00445 connection that's blocked waiting for TLS read is considered not
00446 writeable even if the underlying socket is.
00447 """
00448 return self.retry_write is not None or (self.retry_read is None and asynchat.async_chat.writeable(self))
00449
00450 def handle_read(self):
00451 """
00452 Asyncore says socket is readable. Make sure there's no TLS write
00453 already in progress, retry previous read operation if we had one
00454 that was waiting for more input, otherwise try to read some data,
00455 and handle all the weird OpenSSL exceptions that the TLS code
00456 throws.
00457 """
00458 assert self.retry_write is None, "%r: TLS I/O already in progress, w %r" % (self, self.retry_write)
00459 if self.retry_read is not None:
00460 thunk = self.retry_read
00461 self.retry_read = None
00462 self.log("Retrying TLS read %r" % thunk)
00463 thunk()
00464 else:
00465 try:
00466 asynchat.async_chat.handle_read(self)
00467 except POW.WantReadError:
00468 self.retry_read = self.handle_read
00469 except POW.WantWriteError:
00470 self.retry_write = self.handle_read
00471 except POW.ZeroReturnError:
00472 self.log("ZeroReturn in handle_read()")
00473 self.close()
00474 except POW.SSLUnexpectedEOFError:
00475 self.log("SSLUnexpectedEOF in handle_read()", rpki.log.warn)
00476 self.close(force = True)
00477
00478 def handle_write(self):
00479 """
00480 Asyncore says socket is writeable. Make sure there's no TLS read
00481 already in progress, retry previous write operation if we had one
00482 that was blocked on the socket, otherwise try to write some data.
00483 Handling all the weird OpenSSL exceptions that TLS throws is our
00484 caller's problem.
00485 """
00486
00487
00488
00489
00490
00491 if self.retry_read is not None:
00492 self.log("TLS I/O already in progress, r %r" % self.retry_read)
00493 return
00494 if self.retry_write is not None:
00495 thunk = self.retry_write
00496 self.retry_write = None
00497 thunk()
00498 self.log("Retrying TLS write %r" % thunk)
00499 else:
00500 asynchat.async_chat.handle_write(self)
00501
00502 def initiate_send(self):
00503 """
00504 Initiate a write operation. This is just a wrapper around the
00505 asynchat method, to handle all the whacky TLS exceptions.
00506 """
00507 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00508 try:
00509 asynchat.async_chat.initiate_send(self)
00510 except POW.WantReadError:
00511 self.retry_read = self.initiate_send
00512 except POW.WantWriteError:
00513 self.retry_write = self.initiate_send
00514 except POW.ZeroReturnError:
00515 self.log("ZeroReturn in initiate_send()")
00516 self.close()
00517 except POW.SSLUnexpectedEOFError:
00518 self.log("SSLUnexpectedEOF in initiate_send()", rpki.log.warn)
00519 self.close(force = True)
00520
00521 def close(self, force = False):
00522 """
00523 Close the stream.
00524
00525 Graceful shutdown of a TLS connection requires multiple calls to
00526 the underlying TLS code. If the connection should be closed right
00527 now without waiting (perhaps because it's already dead and we're
00528 just cleaning up), call with force = True.
00529 """
00530 self.log("Close requested")
00531 assert force or (self.retry_read is None and self.retry_write is None), "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00532 if self.tls is not None:
00533 try:
00534 if self.retry_read is None and self.retry_write is None:
00535 ret = self.tls.shutdown()
00536 else:
00537 ret = None
00538 self.log("tls.shutdown() returned %s, force_shutdown %s" % (ret, force))
00539 if ret or force:
00540 self.tls = None
00541 except POW.WantReadError:
00542 self.retry_read = self.close
00543 except POW.WantWriteError:
00544 self.retry_write = self.close
00545 except POW.SSLError, e:
00546 self.log("tls.shutdown() threw %s, shutting down anyway" % e)
00547 self.tls = None
00548 if self.tls is None:
00549 self.log("TLS layer is done, closing socket")
00550 self.timer.cancel()
00551 self.timer.set_handler(None)
00552 asynchat.async_chat.close(self)
00553
00554 def log_cert(self, tag, x):
00555 """
00556 Log HTTPS certificates, if certificate debugging is enabled.
00557 """
00558 if debug_tls_certs:
00559 rpki.log.debug("%r: HTTPS %s cert %r issuer %s [%s] subject %s [%s]" % (self, tag, x, x.getIssuer(), x.hAKI(), x.getSubject(), x.hSKI()))
00560
00561 class http_server(http_stream):
00562 """
00563 HTTP(S) server stream.
00564 """
00565
00566
00567
00568 parse_type = http_request
00569
00570
00571
00572 timeout = default_server_timeout
00573
00574 def __init__(self, sock, handlers, cert = None, key = None, ta = (), dynamic_ta = None):
00575 self.log("Starting")
00576 self.handlers = handlers
00577 http_stream.__init__(self, sock = sock)
00578 self.expect_close = not want_persistent_server
00579
00580 self.log("cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta))
00581
00582 self.tls = POW.Ssl(POW.TLSV1_SERVER_METHOD)
00583 self.log_cert("server", cert)
00584 self.tls.useCertificate(cert.get_POW())
00585 self.tls.useKey(key.get_POW())
00586 ta = rpki.x509.X509.normalize_chain(dynamic_ta() if dynamic_ta else ta)
00587 assert ta
00588 for x in ta:
00589 self.log_cert("trusted", x)
00590 self.tls.addTrust(x.get_POW())
00591 self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT)
00592
00593 self.tls.setFd(self.fileno())
00594 self.tls_accept()
00595
00596 def tls_accept(self):
00597 """
00598 Set up TLS for server side connection, handling all the whacky
00599 OpenSSL exceptions from TLS.
00600
00601 SSLErrorSSLError exceptions are particularly nasty, because all
00602 too often they indicate a certificate lookup failure deep within
00603 the guts of OpenSSL's TLS connection setup logic. Extracting
00604 anything resembling a Python data structure from a handler called
00605 that deep inside the OpenSSL TLS library, while theoretically
00606 possible, runs a high risk of triggering some kind of memory leak
00607 or corruption. So, for now, we just get back a long text string,
00608 which we break up and log but don't attempt to process further.
00609 """
00610 try:
00611 self.tls.accept()
00612 except POW.WantReadError:
00613 self.retry_read = self.tls_accept
00614 except POW.WantWriteError:
00615 self.retry_write = self.tls_accept
00616 except POW.SSLUnexpectedEOFError:
00617 self.log("SSLUnexpectedEOF in tls_accept()")
00618 self.close(force = True)
00619 except POW.SSLErrorSSLError, e:
00620 if "\n" in e:
00621 for line in str(e).splitlines():
00622 rpki.log.error(line)
00623 raise POW.SSLErrorSSLError, "TLS certificate problem, most likely"
00624 else:
00625 raise
00626
00627 def handle_no_content_length(self):
00628 """
00629 Handle an incoming message that used neither chunking nor a
00630 Content-Length header (that is: this message will be the last one
00631 in this server stream). No special action required.
00632 """
00633 self.handle_message()
00634
00635 def find_handler(self, path):
00636 """
00637 Helper method to search self.handlers.
00638 """
00639 for s, h in self.handlers:
00640 if path.startswith(s):
00641 return h
00642 return None
00643
00644 def handle_message(self):
00645 """
00646 TLS and HTTP layers managed to deliver a complete HTTP request to
00647 us, figure out what to do with it. Check the command and
00648 Content-Type, look for a handler, and if everything looks right,
00649 pass the message body, path, and a reply callback to the handler.
00650 """
00651 self.log("Received request %s %s" % (self.msg.cmd, self.msg.path))
00652 if not self.msg.persistent():
00653 self.expect_close = True
00654 handler = self.find_handler(self.msg.path)
00655 error = None
00656 if self.msg.cmd != "POST":
00657 error = 501, "No handler for method %s" % self.msg.cmd
00658 elif self.msg.headers["Content-Type"] != rpki_content_type:
00659 error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
00660 elif handler is None:
00661 error = 404, "No handler for URL %s" % self.msg.path
00662 if error is None:
00663 try:
00664 handler(self.msg.body, self.msg.path, self.send_reply)
00665 except (rpki.async.ExitNow, SystemExit):
00666 raise
00667 except Exception, e:
00668 rpki.log.traceback()
00669 self.send_error(500, "Unhandled exception %s" % e)
00670 else:
00671 self.send_error(code = error[0], reason = error[1])
00672
00673 def send_error(self, code, reason):
00674 """
00675 Send an error response to this request.
00676 """
00677 self.send_message(code = code, reason = reason)
00678
00679 def send_reply(self, code, body):
00680 """
00681 Send a reply to this request.
00682 """
00683 self.send_message(code = code, body = body)
00684
00685 def send_message(self, code, reason = "OK", body = None):
00686 """
00687 Queue up reply message. If both parties agree that connection is
00688 persistant, and if no error occurred, restart this stream to
00689 listen for next message; otherwise, queue up a close event for
00690 this stream so it will shut down once the reply has been sent.
00691 """
00692 self.log("Sending response %s %s" % (code, reason))
00693 if code >= 400:
00694 self.expect_close = True
00695 msg = http_response(code = code, reason = reason, body = body,
00696 Content_Type = rpki_content_type,
00697 Connection = "Close" if self.expect_close else "Keep-Alive")
00698 self.push(msg.format())
00699 if self.expect_close:
00700 self.log("Closing")
00701 self.timer.cancel()
00702 self.close_when_done()
00703 else:
00704 self.log("Listening for next message")
00705 self.restart()
00706
00707 class http_listener(asyncore.dispatcher):
00708 """
00709 Listener for incoming HTTP(S) connections.
00710 """
00711
00712 log = log_method
00713
00714 def __init__(self, handlers, port = 80, host = "", cert = None, key = None, ta = None, dynamic_ta = None, af = supported_address_families[0]):
00715 self.log("Listener cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta))
00716 asyncore.dispatcher.__init__(self)
00717 self.handlers = handlers
00718 self.cert = cert
00719 self.key = key
00720 self.ta = ta
00721 self.dynamic_ta = dynamic_ta
00722 try:
00723 self.create_socket(af, socket.SOCK_STREAM)
00724 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00725 if hasattr(socket, "SO_REUSEPORT"):
00726 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00727 self.bind((host, port))
00728 self.listen(5)
00729 except (rpki.async.ExitNow, SystemExit):
00730 raise
00731 except:
00732 self.handle_error()
00733 self.log("Listening on %r, handlers %r" % ((host, port), handlers))
00734
00735 def handle_accept(self):
00736 """
00737 Asyncore says we have an incoming connection, spawn an http_server
00738 stream for it and pass along all of our handler and TLS data.
00739 """
00740 self.log("Accepting connection")
00741 try:
00742 http_server(sock = self.accept()[0], handlers = self.handlers, cert = self.cert, key = self.key, ta = self.ta, dynamic_ta = self.dynamic_ta)
00743 except (rpki.async.ExitNow, SystemExit):
00744 raise
00745 except:
00746 self.handle_error()
00747
00748 def handle_error(self):
00749 """
00750 Asyncore signaled an error, pass it along or log it.
00751 """
00752 if sys.exc_info()[0] is SystemExit:
00753 self.log("Caught SystemExit, propagating")
00754 raise
00755 else:
00756 self.log("Error in HTTP listener", rpki.log.warn)
00757 rpki.log.traceback()
00758
00759 class http_client(http_stream):
00760 """
00761 HTTP(S) client stream.
00762 """
00763
00764
00765
00766 parse_type = http_response
00767
00768
00769
00770 timeout = default_client_timeout
00771
00772 def __init__(self, queue, hostport, cert = None, key = None, ta = (), af = supported_address_families[0]):
00773 self.log("Creating new connection to %r" % (hostport,))
00774 self.log("cert %r key %r ta %r" % (cert, key, ta))
00775 http_stream.__init__(self)
00776 self.queue = queue
00777 self.hostport = hostport
00778 self.state = "opening"
00779 self.expect_close = not want_persistent_client
00780 self.cert = cert
00781 self.key = key
00782 self.ta = rpki.x509.X509.normalize_chain(ta)
00783 self.af = af
00784
00785 def start(self):
00786 """
00787 Create socket and request a connection.
00788 """
00789 try:
00790 self.create_socket(self.af, socket.SOCK_STREAM)
00791 self.connect(self.hostport)
00792 except (rpki.async.ExitNow, SystemExit):
00793 raise
00794 except:
00795 self.handle_error()
00796
00797 def handle_connect(self):
00798 """
00799 Asyncore says socket has connected, configure TLS junk.
00800 """
00801 self.log("Socket connected")
00802 self.tls = POW.Ssl(POW.TLSV1_CLIENT_METHOD)
00803 self.log_cert("client", self.cert)
00804 self.tls.useCertificate(self.cert.get_POW())
00805 self.tls.useKey(self.key.get_POW())
00806 assert self.ta
00807 for x in self.ta:
00808 self.log_cert("trusted", x)
00809 self.tls.addTrust(x.get_POW())
00810 self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT)
00811 self.tls.setFd(self.fileno())
00812 self.tls_connect()
00813
00814 def tls_connect(self):
00815 """
00816 Initialize client side of TLS.
00817 """
00818 try:
00819 self.tls.connect()
00820 except POW.WantReadError:
00821 self.retry_read = self.tls_connect
00822 except POW.WantWriteError:
00823 self.retry_write = self.tls_connect
00824 else:
00825 self.log("TLS connected")
00826 self.set_state("idle")
00827 self.queue.send_request()
00828
00829 def set_state(self, state):
00830 """
00831 Set HTTP client connection state.
00832 """
00833 self.log("State transition %s => %s" % (self.state, state))
00834 self.state = state
00835
00836 def handle_no_content_length(self):
00837 """
00838 Handle response message that used neither chunking nor a
00839 Content-Length header (that is: this message will be the last one
00840 in this server stream). In this case we want to read until we
00841 reach the end of the data stream.
00842 """
00843 self.set_terminator(None)
00844
00845 def send_request(self, msg):
00846 """
00847 Queue up request message and kickstart connection.
00848 """
00849 self.log("Sending request %r" % msg)
00850 assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
00851 self.set_state("request-sent")
00852 msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
00853 self.push(msg.format())
00854 self.restart()
00855
00856 def handle_message(self):
00857 """
00858 Handle incoming HTTP response message. Make sure we're in a state
00859 where we expect to see such a message (and allow the mysterious
00860 empty messages that Apache sends during connection close, no idea
00861 what that is supposed to be about). If everybody agrees that the
00862 connection should stay open, put it into an idle state; otherwise,
00863 arrange for the stream to shut down.
00864 """
00865
00866 self.log("Message received, state %s" % self.state)
00867
00868 if not self.msg.persistent():
00869 self.expect_close = True
00870
00871 if self.state != "request-sent":
00872 if self.state == "closing":
00873 assert not self.msg.body
00874 self.log("Ignoring empty response received while closing")
00875 return
00876 raise rpki.exceptions.HTTPSUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state)
00877
00878 if self.expect_close:
00879 self.log("Closing")
00880 self.set_state("closing")
00881 self.queue.detach(self)
00882 self.close_when_done()
00883 else:
00884 self.log("Idling")
00885 self.set_state("idle")
00886 self.update_timeout()
00887
00888 if self.msg.code != 200:
00889 raise rpki.exceptions.HTTPRequestFailed, "HTTPS request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)
00890 self.queue.return_result(self.msg)
00891
00892 def handle_close(self):
00893 """
00894 Asyncore signaled connection close. If we were waiting for that
00895 to find the end of a response message, process the resulting
00896 message now; if we were waiting for the response to a request we
00897 sent, signal the error.
00898 """
00899 http_stream.handle_close(self)
00900 self.log("State %s" % self.state)
00901 self.queue.detach(self)
00902 if self.get_terminator() is None:
00903 self.handle_body()
00904 elif self.state == "request-sent":
00905 raise rpki.exceptions.HTTPSClientAborted, "HTTPS request aborted by close event"
00906
00907 def handle_timeout(self):
00908 """
00909 Connection idle timer has expired. If we weren't idle, log that
00910 something bad has happened, then shut down the connection in any
00911 case.
00912 """
00913 if self.state != "idle":
00914 self.log("Timeout while in state %s" % self.state)
00915 http_stream.handle_timeout(self)
00916 self.queue.detach(self)
00917
00918 def handle_error(self):
00919 """
00920 Asyncore says something threw an exception. Log it, then shut
00921 down the connection and pass back the exception.
00922 """
00923 eclass, edata = sys.exc_info()[0:2]
00924 self.log("Error on HTTP client connection %r: %s %s" % (self.hostport, eclass, edata), rpki.log.warn)
00925 http_stream.handle_error(self)
00926 self.queue.detach(self)
00927 self.queue.return_result(edata)
00928
00929 class http_queue(object):
00930 """
00931 Queue of pending HTTP requests for a single destination. This class
00932 is very tightly coupled to http_client; http_client handles the HTTP
00933 stream itself, this class provides a slightly higher-level API.
00934 """
00935
00936 log = log_method
00937
00938 def __init__(self, hostport, cert = None, key = None, ta = ()):
00939 self.log("Creating queue for %r" % (hostport,))
00940 self.log("cert %r key %r ta %r" % (cert, key, ta))
00941 self.hostport = hostport
00942 self.client = None
00943 self.queue = []
00944 self.cert = cert
00945 self.key = key
00946 self.ta = ta
00947
00948 def request(self, *requests):
00949 """
00950 Append http_request object(s) to this queue.
00951 """
00952 self.log("Adding requests %r" % requests)
00953 self.queue.extend(requests)
00954
00955 def restart(self):
00956 """
00957 Send next request for this queue, if we can. This may involve
00958 starting a new http_client stream, reusing an existing idle
00959 stream, or just ignoring this request if there's an active client
00960 stream already; in the last case, handling of the response (or
00961 exception, or timeout) for the query currently in progress will
00962 call this method when it's time to kick out the next query.
00963 """
00964 try:
00965 if self.client is None:
00966 self.client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta)
00967 self.log("Attached client %r" % self.client)
00968 self.client.start()
00969 elif self.client.state == "idle":
00970 self.log("Sending request to existing client %r" % self.client)
00971 self.send_request()
00972 else:
00973 self.log("Client %r exists in state %r" % (self.client, self.client.state))
00974 except (rpki.async.ExitNow, SystemExit):
00975 raise
00976 except Exception, e:
00977 self.return_result(e)
00978
00979 def send_request(self):
00980 """
00981 Kick out the next query in this queue, if any.
00982 """
00983 if self.queue:
00984 self.client.send_request(self.queue[0])
00985
00986 def detach(self, client_):
00987 """
00988 Detatch a client from this queue. Silently ignores attempting to
00989 detach a client that is not attached to this queue, to simplify
00990 handling of what otherwise would be a nasty set of race
00991 conditions.
00992 """
00993 if client_ is self.client:
00994 self.log("Detaching client %r" % client_)
00995 self.client = None
00996
00997 def return_result(self, result):
00998 """
00999 Client stream has returned a result, which we need to pass along
01000 to the original caller. Result may be either an HTTP response
01001 message or an exception. In either case, once we're done
01002 processing this result, kick off next message in the queue, if any.
01003 """
01004
01005 if not self.queue:
01006 self.log("No caller, this should not happen. Dropping result %r" % result)
01007
01008 req = self.queue.pop(0)
01009 self.log("Dequeuing request %r" % req)
01010
01011 try:
01012 if isinstance(result, http_response):
01013 self.log("Returning result %r to caller" % result)
01014 req.callback(result.body)
01015 else:
01016 assert isinstance(result, Exception)
01017 self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn)
01018 req.errback(result)
01019 except (rpki.async.ExitNow, SystemExit):
01020 raise
01021 except:
01022 self.log("Unhandled exception from callback")
01023 rpki.log.traceback()
01024
01025 self.log("Queue: %r" % self.queue)
01026
01027 if self.queue:
01028 self.restart()
01029
01030
01031
01032 client_queues = {}
01033
01034 def client(msg, client_key, client_cert, server_ta, url, callback, errback):
01035 """
01036 Open client HTTPS connection, send a message, set up callbacks to
01037 handle response.
01038 """
01039
01040 u = urlparse.urlparse(url)
01041
01042 if (u.scheme not in ("", "https") or
01043 u.username is not None or
01044 u.password is not None or
01045 u.params != "" or
01046 u.query != "" or
01047 u.fragment != ""):
01048 raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url
01049
01050 if debug_http:
01051 rpki.log.debug("Contacting %s" % url)
01052
01053 request = http_request(
01054 cmd = "POST",
01055 path = u.path,
01056 body = msg,
01057 callback = callback,
01058 errback = errback,
01059 Host = u.hostname,
01060 Content_Type = rpki_content_type)
01061
01062 hostport = (u.hostname or "localhost", u.port or 80)
01063
01064 if debug_http:
01065 rpki.log.debug("Created request %r for %r" % (request, hostport))
01066 if not isinstance(server_ta, (tuple, list)):
01067 server_ta = (server_ta,)
01068 if hostport not in client_queues:
01069 client_queues[hostport] = http_queue(hostport, cert = client_cert, key = client_key, ta = server_ta)
01070 client_queues[hostport].request(request)
01071
01072
01073
01074
01075 if debug_http:
01076 rpki.log.debug("Scheduling connection startup for %r" % request)
01077 rpki.async.defer(client_queues[hostport].restart)
01078
01079 def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dynamic_https_trust_anchor = None, address_families = supported_address_families):
01080 """
01081 Run an HTTPS server and wait (forever) for connections.
01082 """
01083
01084 if not isinstance(handlers, (tuple, list)):
01085 handlers = (("/", handlers),)
01086
01087 if not isinstance(client_ta, (tuple, list)):
01088 client_ta = (client_ta,)
01089
01090 for af in address_families:
01091 http_listener(port = port, host = host, handlers = handlers, cert = server_cert, key = server_key, ta = client_ta, dynamic_ta = dynamic_https_trust_anchor, af = af)
01092 rpki.async.event_loop()
01093
01094 def build_https_ta_cache(certs):
01095 """
01096 Package up a collection of certificates into a form suitable for use
01097 as a dynamic HTTPS trust anchor set. Precise format of this
01098 collection is an internal conspiracy within the rpki.https module;
01099 at one point it was a POW.X509Store object, at the moment it's a
01100 Python set, what it will be tomorow is nobody else's business.
01101 """
01102
01103 return set(certs)
01104
01105 class caller(object):
01106 """
01107 Handle client-side mechanics for protocols based on HTTPS, CMS, and
01108 rpki.xml_utils. Calling sequence is intended to nest within
01109 rpki.async.sync_wrapper.
01110 """
01111
01112 debug = False
01113
01114 def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
01115 self.proto = proto
01116 self.client_key = client_key
01117 self.client_cert = client_cert
01118 self.server_ta = server_ta
01119 self.server_cert = server_cert
01120 self.url = url
01121 if debug is not None:
01122 self.debug = debug
01123
01124 def __call__(self, cb, eb, *pdus):
01125
01126 def done(cms):
01127 """
01128 Handle CMS-wrapped XML response message.
01129 """
01130 result = self.proto.cms_msg.unwrap(cms, (self.server_ta, self.server_cert), pretty_print = self.debug)
01131 if self.debug:
01132 msg, xml = result
01133 print "<!-- Reply -->"
01134 print xml
01135 else:
01136 msg = result
01137 cb(msg)
01138
01139 msg = self.proto.msg.query(*pdus)
01140 result = self.proto.cms_msg.wrap(msg, self.client_key, self.client_cert, pretty_print = self.debug)
01141 if self.debug:
01142 cms, xml = result
01143 print "<!-- Query -->"
01144 print xml
01145 else:
01146 cms = result
01147
01148 client(
01149 client_key = self.client_key,
01150 client_cert = self.client_cert,
01151 server_ta = self.server_ta,
01152 url = self.url,
01153 msg = cms,
01154 callback = done,
01155 errback = eb)