00001 """
00002 HTTPS utilities, both client and server.
00003
00004 $Id: https.py 2902 2009-12-22 21:57:32Z sra $
00005
00006 Copyright (C) 2009 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019
00020 Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
00021
00022 Permission to use, copy, modify, and distribute this software for any
00023 purpose with or without fee is hereby granted, provided that the above
00024 copyright notice and this permission notice appear in all copies.
00025
00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00028 AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00032 PERFORMANCE OF THIS SOFTWARE.
00033 """
00034
00035 import time, socket, asyncore, asynchat, urlparse, sys
00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log
00037 import POW
00038
00039 rpki_content_type = "application/x-rpki"
00040
00041
00042
00043
00044
00045 debug_http = False
00046
00047
00048 debug_tls_certs = False
00049
00050
00051 want_persistent_client = False
00052 want_persistent_server = False
00053
00054
00055
00056
00057
00058 default_client_timeout = rpki.sundial.timedelta(minutes = 15)
00059 default_server_timeout = rpki.sundial.timedelta(minutes = 20)
00060
00061 default_http_version = (1, 0)
00062
00063 class http_message(object):
00064
00065 software_name = "ISC RPKI library"
00066
00067 def __init__(self, version = None, body = None, headers = None):
00068 self.version = version
00069 self.body = body
00070 self.headers = headers
00071 self.normalize_headers()
00072
00073 def normalize_headers(self, headers = None):
00074 if headers is None:
00075 headers = () if self.headers is None else self.headers.items()
00076 translate_underscore = True
00077 else:
00078 translate_underscore = False
00079 result = {}
00080 for k, v in headers:
00081 if translate_underscore:
00082 k = k.replace("_", "-")
00083 k = "-".join(s.capitalize() for s in k.split("-"))
00084 v = v.strip()
00085 if k in result:
00086 result[k] += ", " + v
00087 else:
00088 result[k] = v
00089 self.headers = result
00090
00091 @classmethod
00092 def parse_from_wire(cls, headers):
00093 self = cls()
00094 headers = headers.split("\r\n")
00095 self.parse_first_line(*headers.pop(0).split(None, 2))
00096 for i in xrange(len(headers) - 2, -1, -1):
00097 if headers[i + 1][0].isspace():
00098 headers[i] += headers[i + 1]
00099 del headers[i + 1]
00100 self.normalize_headers([h.split(":", 1) for h in headers])
00101 return self
00102
00103 def format(self):
00104 s = self.format_first_line()
00105 if self.body is not None:
00106 assert isinstance(self.body, str)
00107 self.headers["Content-Length"] = len(self.body)
00108 for kv in self.headers.iteritems():
00109 s += "%s: %s\r\n" % kv
00110 s += "\r\n"
00111 if self.body is not None:
00112 s += self.body
00113 return s
00114
00115 def __str__(self):
00116 return self.format()
00117
00118 def parse_version(self, version):
00119 if version[:5] != "HTTP/":
00120 raise rpki.exceptions.HTTPSBadVersion, "Couldn't parse version %s" % version
00121 self.version = tuple(int(i) for i in version[5:].split("."))
00122
00123 def persistent(self):
00124 c = self.headers.get("Connection")
00125 if self.version == (1, 1):
00126 return c is None or "close" not in c.lower()
00127 elif self.version == (1, 0):
00128 return c is not None and "keep-alive" in c.lower()
00129 else:
00130 return False
00131
00132 class http_request(http_message):
00133
00134 def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
00135 assert cmd == "POST" or body is None
00136 http_message.__init__(self, version = version, body = body, headers = headers)
00137 self.cmd = cmd
00138 self.path = path
00139 self.callback = callback
00140 self.errback = errback
00141 self.retried = False
00142
00143 def parse_first_line(self, cmd, path, version):
00144 self.parse_version(version)
00145 self.cmd = cmd
00146 self.path = path
00147
00148 def format_first_line(self):
00149 self.headers.setdefault("User-Agent", self.software_name)
00150 return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
00151
00152 class http_response(http_message):
00153
00154 def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
00155 http_message.__init__(self, version = version, body = body, headers = headers)
00156 self.code = code
00157 self.reason = reason
00158
00159 def parse_first_line(self, version, code, reason):
00160 self.parse_version(version)
00161 self.code = int(code)
00162 self.reason = reason
00163
00164 def format_first_line(self):
00165 self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
00166 self.headers.setdefault("Server", self.software_name)
00167 return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
00168
00169 def log_method(self, msg, logger = rpki.log.debug):
00170 assert isinstance(logger, rpki.log.logger)
00171 if debug_http or logger is not rpki.log.debug:
00172 logger("%r: %s" % (self, msg))
00173
00174 class http_stream(asynchat.async_chat):
00175
00176 log = log_method
00177 tls = None
00178 retry_read = None
00179 retry_write = None
00180
00181 def __init__(self, sock = None):
00182 asynchat.async_chat.__init__(self, sock)
00183 self.buffer = []
00184 self.timer = rpki.async.timer(self.handle_timeout)
00185 self.restart()
00186
00187 def restart(self):
00188 assert not self.buffer
00189 self.chunk_handler = None
00190 self.set_terminator("\r\n\r\n")
00191 if self.timeout is not None:
00192 self.timer.set(self.timeout)
00193 else:
00194 self.timer.cancel()
00195
00196 def update_timeout(self):
00197 if self.timeout is not None:
00198 self.timer.set(self.timeout)
00199 else:
00200 self.timer.cancel()
00201
00202 def collect_incoming_data(self, data):
00203 """
00204 Buffer the data
00205 """
00206 self.buffer.append(data)
00207 self.update_timeout()
00208
00209 def get_buffer(self):
00210 val = "".join(self.buffer)
00211 self.buffer = []
00212 return val
00213
00214 def found_terminator(self):
00215 self.update_timeout()
00216 if self.chunk_handler:
00217 self.chunk_handler()
00218 elif not isinstance(self.get_terminator(), str):
00219 self.handle_body()
00220 else:
00221 self.msg = self.parse_type.parse_from_wire(self.get_buffer())
00222 if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
00223 self.msg.body = []
00224 self.chunk_handler = self.chunk_header
00225 self.set_terminator("\r\n")
00226 elif "Content-Length" in self.msg.headers:
00227 self.set_terminator(int(self.msg.headers["Content-Length"]))
00228 else:
00229 self.handle_no_content_length()
00230
00231 def chunk_header(self):
00232 n = int(self.get_buffer().partition(";")[0], 16)
00233 self.log("Chunk length %s" % n)
00234 if n:
00235 self.chunk_handler = self.chunk_body
00236 self.set_terminator(n)
00237 else:
00238 self.msg.body = "".join(self.msg.body)
00239 self.chunk_handler = self.chunk_discard_trailer
00240
00241 def chunk_body(self):
00242 self.log("Chunk body")
00243 self.msg.body += self.buffer
00244 self.buffer = []
00245 self.chunk_handler = self.chunk_discard_crlf
00246 self.set_terminator("\r\n")
00247
00248 def chunk_discard_crlf(self):
00249 self.log("Chunk CRLF")
00250 s = self.get_buffer()
00251 assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
00252 self.chunk_handler = self.chunk_header
00253
00254 def chunk_discard_trailer(self):
00255 self.log("Chunk trailer")
00256 s = self.get_buffer()
00257 assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
00258 self.chunk_handler = None
00259 self.handle_message()
00260
00261 def handle_body(self):
00262 self.msg.body = self.get_buffer()
00263 self.handle_message()
00264
00265 def handle_error(self):
00266 etype = sys.exc_info()[0]
00267 if etype in (SystemExit, rpki.async.ExitNow):
00268 self.log("Caught %s, propagating" % etype.__name__)
00269 raise
00270 self.log("Error in HTTP stream handler", rpki.log.warn)
00271 rpki.log.traceback()
00272 if etype not in (rpki.exceptions.HTTPSClientAborted,):
00273 self.log("Closing due to error", rpki.log.warn)
00274 self.close(force = True)
00275
00276 def handle_timeout(self):
00277 self.log("Timeout, closing")
00278 self.close()
00279
00280 def handle_close(self):
00281 self.log("Close event in HTTP stream handler")
00282 self.timer.cancel()
00283 self.timer.set_handler(None)
00284
00285 def send(self, data):
00286 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00287 assert self.tls is not None
00288 return self.tls.write(data)
00289
00290 def recv(self, buffer_size):
00291 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00292 assert self.tls is not None
00293 return self.tls.read(buffer_size)
00294
00295 def readable(self):
00296 return self.retry_read is not None or (self.retry_write is None and asynchat.async_chat.readable(self))
00297
00298 def writeable(self):
00299 return self.retry_write is not None or (self.retry_read is None and asynchat.async_chat.writeable(self))
00300
00301 def handle_read(self):
00302 assert self.retry_write is None, "%r: TLS I/O already in progress, w %r" % (self, self.retry_write)
00303 if self.retry_read is not None:
00304 thunk = self.retry_read
00305 self.retry_read = None
00306 self.log("Retrying TLS read %r" % thunk)
00307 thunk()
00308 else:
00309 try:
00310 asynchat.async_chat.handle_read(self)
00311 except POW.WantReadError:
00312 self.retry_read = self.handle_read
00313 except POW.WantWriteError:
00314 self.retry_write = self.handle_read
00315 except POW.ZeroReturnError:
00316 self.log("ZeroReturn in handle_read()")
00317 self.close()
00318 except POW.SSLUnexpectedEOFError:
00319 self.log("SSLUnexpectedEOF in handle_read()", rpki.log.warn)
00320 self.close(force = True)
00321
00322 def handle_write(self):
00323
00324
00325
00326
00327
00328
00329 if self.retry_read is not None:
00330 self.log("TLS I/O already in progress, r %r" % self.retry_read)
00331 return
00332
00333 if self.retry_write is not None:
00334 thunk = self.retry_write
00335 self.retry_write = None
00336 thunk()
00337 self.log("Retrying TLS write %r" % thunk)
00338 else:
00339 asynchat.async_chat.handle_write(self)
00340
00341 def initiate_send(self):
00342 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00343 try:
00344 asynchat.async_chat.initiate_send(self)
00345 except POW.WantReadError:
00346 self.retry_read = self.initiate_send
00347 except POW.WantWriteError:
00348 self.retry_write = self.initiate_send
00349 except POW.ZeroReturnError:
00350 self.log("ZeroReturn in initiate_send()")
00351 self.close()
00352 except POW.SSLUnexpectedEOFError:
00353 self.log("SSLUnexpectedEOF in initiate_send()", rpki.log.warn)
00354 self.close(force = True)
00355
00356 def close(self, force = False):
00357 self.log("Close requested")
00358 assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write)
00359 if self.tls is not None:
00360 try:
00361 ret = self.tls.shutdown()
00362 self.log("tls.shutdown() returned %s, force_shutdown %s" % (ret, force))
00363 if ret or force:
00364 self.tls = None
00365 asynchat.async_chat.close(self)
00366 self.handle_close()
00367 except POW.WantReadError:
00368 self.retry_read = self.close
00369 except POW.WantWriteError:
00370 self.retry_write = self.close
00371 except POW.SSLError, e:
00372 self.log("socket shutdown threw %s, shutting down anyway" % e)
00373 self.tls = None
00374 asynchat.async_chat.close(self)
00375 self.handle_close()
00376
00377 def log_cert(self, tag, x):
00378 if debug_tls_certs:
00379 rpki.log.debug("%r: HTTPS %s cert %r issuer %s [%s] subject %s [%s]" % (self, tag, x, x.getIssuer(), x.hAKI(), x.getSubject(), x.hSKI()))
00380
00381 class http_server(http_stream):
00382
00383 parse_type = http_request
00384
00385 timeout = default_server_timeout
00386
00387 def __init__(self, sock, handlers, cert = None, key = None, ta = (), dynamic_ta = None):
00388 self.log("Starting")
00389 self.handlers = handlers
00390 http_stream.__init__(self, sock = sock)
00391 self.expect_close = not want_persistent_server
00392
00393 self.log("cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta))
00394
00395 self.tls = POW.Ssl(POW.TLSV1_SERVER_METHOD)
00396 self.log_cert("server", cert)
00397 self.tls.useCertificate(cert.get_POW())
00398 self.tls.useKey(key.get_POW())
00399 ta = rpki.x509.X509.normalize_chain(dynamic_ta() if dynamic_ta else ta)
00400 assert ta
00401 for x in ta:
00402 self.log_cert("trusted", x)
00403 self.tls.addTrust(x.get_POW())
00404 self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT)
00405
00406 self.tls.setFd(self.fileno())
00407 self.tls_accept()
00408
00409 def tls_accept(self):
00410 try:
00411 self.tls.accept()
00412 except POW.WantReadError:
00413 self.retry_read = self.tls_accept
00414 except POW.WantWriteError:
00415 self.retry_write = self.tls_accept
00416 except POW.SSLUnexpectedEOFError:
00417 self.log("SSLUnexpectedEOF in tls_accept()", rpki.log.warn)
00418 self.close(force = True)
00419
00420 def handle_no_content_length(self):
00421 self.handle_message()
00422
00423 def find_handler(self, path):
00424 """
00425 Helper method to search self.handlers.
00426 """
00427 for s, h in self.handlers:
00428 if path.startswith(s):
00429 return h
00430 return None
00431
00432 def handle_message(self):
00433 self.log("Received request %s %s" % (self.msg.cmd, self.msg.path))
00434 if not self.msg.persistent():
00435 self.expect_close = True
00436 handler = self.find_handler(self.msg.path)
00437 error = None
00438 if self.msg.cmd != "POST":
00439 error = 501, "No handler for method %s" % self.msg.cmd
00440 elif self.msg.headers["Content-Type"] != rpki_content_type:
00441 error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
00442 elif handler is None:
00443 error = 404, "No handler for URL %s" % self.msg.path
00444 if error is None:
00445 try:
00446 handler(self.msg.body, self.msg.path, self.send_reply)
00447 except (rpki.async.ExitNow, SystemExit):
00448 raise
00449 except Exception, e:
00450 rpki.log.traceback()
00451 self.send_error(500, "Unhandled exception %s" % e)
00452 else:
00453 self.send_error(code = error[0], reason = error[1])
00454
00455 def send_error(self, code, reason):
00456 self.send_message(code = code, reason = reason)
00457
00458 def send_reply(self, code, body):
00459 self.send_message(code = code, body = body)
00460
00461 def send_message(self, code, reason = "OK", body = None):
00462 self.log("Sending response %s %s" % (code, reason))
00463 if code >= 400:
00464 self.expect_close = True
00465 msg = http_response(code = code, reason = reason, body = body,
00466 Content_Type = rpki_content_type,
00467 Connection = "Close" if self.expect_close else "Keep-Alive")
00468 self.push(msg.format())
00469 if self.expect_close:
00470 self.log("Closing")
00471 self.timer.cancel()
00472 self.close_when_done()
00473 else:
00474 self.log("Listening for next message")
00475 self.restart()
00476
00477 class http_listener(asyncore.dispatcher):
00478
00479 log = log_method
00480
00481 def __init__(self, handlers, port = 80, host = "", cert = None, key = None, ta = None, dynamic_ta = None):
00482 self.log("Listener cert %r key %r ta %r dynamic_ta %r" % (cert, key, ta, dynamic_ta))
00483 asyncore.dispatcher.__init__(self)
00484 self.handlers = handlers
00485 self.cert = cert
00486 self.key = key
00487 self.ta = ta
00488 self.dynamic_ta = dynamic_ta
00489 try:
00490 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
00491 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00492 if hasattr(socket, "SO_REUSEPORT"):
00493 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00494 self.bind((host, port))
00495 self.listen(5)
00496 except (rpki.async.ExitNow, SystemExit):
00497 raise
00498 except:
00499 self.handle_error()
00500 self.log("Listening on %r, handlers %r" % ((host, port), handlers))
00501
00502 def handle_accept(self):
00503 self.log("Accepting connection")
00504 try:
00505 http_server(sock = self.accept()[0], handlers = self.handlers, cert = self.cert, key = self.key, ta = self.ta, dynamic_ta = self.dynamic_ta)
00506 except (rpki.async.ExitNow, SystemExit):
00507 raise
00508 except:
00509 self.handle_error()
00510
00511 def handle_error(self):
00512 if sys.exc_info()[0] is SystemExit:
00513 self.log("Caught SystemExit, propagating")
00514 raise
00515 else:
00516 self.log("Error in HTTP listener", rpki.log.warn)
00517 rpki.log.traceback()
00518
00519 class http_client(http_stream):
00520
00521 parse_type = http_response
00522
00523 timeout = default_client_timeout
00524
00525 def __init__(self, queue, hostport, cert = None, key = None, ta = ()):
00526 self.log("Creating new connection to %r" % (hostport,))
00527 self.log("cert %r key %r ta %r" % (cert, key, ta))
00528 http_stream.__init__(self)
00529 self.queue = queue
00530 self.hostport = hostport
00531 self.state = "opening"
00532 self.expect_close = not want_persistent_client
00533 self.cert = cert
00534 self.key = key
00535 self.ta = rpki.x509.X509.normalize_chain(ta)
00536
00537 def start(self):
00538 try:
00539 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
00540 self.connect(self.hostport)
00541 except (rpki.async.ExitNow, SystemExit):
00542 raise
00543 except:
00544 self.handle_error()
00545
00546 def handle_connect(self):
00547 self.log("Socket connected")
00548 self.tls = POW.Ssl(POW.TLSV1_CLIENT_METHOD)
00549 self.log_cert("client", self.cert)
00550 self.tls.useCertificate(self.cert.get_POW())
00551 self.tls.useKey(self.key.get_POW())
00552 assert self.ta
00553 for x in self.ta:
00554 self.log_cert("trusted", x)
00555 self.tls.addTrust(x.get_POW())
00556 self.tls.setVerifyMode(POW.SSL_VERIFY_PEER | POW.SSL_VERIFY_FAIL_IF_NO_PEER_CERT)
00557 self.tls.setFd(self.fileno())
00558 self.tls_connect()
00559
00560 def tls_connect(self):
00561 try:
00562 self.tls.connect()
00563 except POW.WantReadError:
00564 self.retry_read = self.tls_connect
00565 except POW.WantWriteError:
00566 self.retry_write = self.tls_connect
00567 else:
00568 self.log("TLS connected")
00569 self.set_state("idle")
00570 self.queue.send_request()
00571
00572 def set_state(self, state):
00573 self.log("State transition %s => %s" % (self.state, state))
00574 self.state = state
00575
00576 def handle_no_content_length(self):
00577 self.set_terminator(None)
00578
00579 def send_request(self, msg):
00580 self.log("Sending request %r" % msg)
00581 assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
00582 self.set_state("request-sent")
00583 msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
00584 self.push(msg.format())
00585 self.restart()
00586
00587 def handle_message(self):
00588 self.log("Message received, state %s" % self.state)
00589
00590 if not self.msg.persistent():
00591 self.expect_close = True
00592
00593 if self.state != "request-sent":
00594 if self.state == "closing":
00595 assert not self.msg.body
00596 self.log("Ignoring empty response received while closing")
00597 return
00598 raise rpki.exceptions.HTTPSUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state)
00599
00600 if self.expect_close:
00601 self.log("Closing")
00602 self.set_state("closing")
00603 self.queue.detach(self)
00604 self.close_when_done()
00605 else:
00606 self.log("Idling")
00607 self.set_state("idle")
00608 self.update_timeout()
00609
00610 if self.msg.code != 200:
00611 raise rpki.exceptions.HTTPRequestFailed, "HTTPS request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)
00612 self.queue.return_result(self.msg)
00613
00614 def handle_close(self):
00615 http_stream.handle_close(self)
00616 self.log("State %s" % self.state)
00617 self.queue.detach(self)
00618 if self.get_terminator() is None:
00619 self.handle_body()
00620 elif self.state == "request-sent":
00621 raise rpki.exceptions.HTTPSClientAborted, "HTTPS request aborted by close event"
00622
00623 def handle_timeout(self):
00624 if self.state != "idle":
00625 self.log("Timeout while in state %s" % self.state)
00626 http_stream.handle_timeout(self)
00627 self.queue.detach(self)
00628
00629 def handle_error(self):
00630 http_stream.handle_error(self)
00631 self.queue.detach(self)
00632 self.queue.return_result(sys.exc_info()[1])
00633
00634 class http_queue(object):
00635
00636 log = log_method
00637
00638 def __init__(self, hostport, cert = None, key = None, ta = ()):
00639 self.log("Creating queue for %r" % (hostport,))
00640 self.log("cert %r key %r ta %r" % (cert, key, ta))
00641 self.hostport = hostport
00642 self.client = None
00643 self.queue = []
00644 self.cert = cert
00645 self.key = key
00646 self.ta = ta
00647
00648 def request(self, *requests):
00649 self.log("Adding requests %r" % requests)
00650 self.queue.extend(requests)
00651
00652 def restart(self):
00653 try:
00654 if self.client is None:
00655 client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta)
00656 self.log("Attaching client %r" % client)
00657 self.client = client
00658 self.client.start()
00659 elif self.client.state == "idle":
00660 self.log("Sending request to existing client %r" % self.client)
00661 self.send_request()
00662 else:
00663 self.log("Client %r exists in state %r" % (self.client, self.client.state))
00664 except (rpki.async.ExitNow, SystemExit):
00665 raise
00666 except Exception, e:
00667 self.return_result(e)
00668
00669 def send_request(self):
00670 if self.queue:
00671 self.client.send_request(self.queue[0])
00672
00673 def detach(self, client):
00674 if client is self.client:
00675 self.log("Detaching client %r" % client)
00676 self.client = None
00677
00678 def return_result(self, result):
00679
00680 if not self.queue:
00681 self.log("No caller, this should not happen. Dropping result %r" % result)
00682
00683 req = self.queue.pop(0)
00684 self.log("Dequeuing request %r" % req)
00685
00686 try:
00687 if isinstance(result, http_response):
00688 self.log("Returning result %r to caller" % result)
00689 req.callback(result.body)
00690 else:
00691 assert isinstance(result, Exception)
00692 self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn)
00693 req.errback(result)
00694 except (rpki.async.ExitNow, SystemExit):
00695 raise
00696 except:
00697 self.log("Unhandled exception from callback")
00698 rpki.log.traceback()
00699
00700 self.log("Queue: %r" % self.queue)
00701
00702 if self.queue:
00703 self.restart()
00704
00705 client_queues = {}
00706
00707 def client(msg, client_key, client_cert, server_ta, url, callback, errback):
00708 """
00709 Open client HTTPS connection, send a message, set up callbacks to
00710 handle response.
00711 """
00712
00713 u = urlparse.urlparse(url)
00714
00715 if (u.scheme not in ("", "https") or
00716 u.username is not None or
00717 u.password is not None or
00718 u.params != "" or
00719 u.query != "" or
00720 u.fragment != ""):
00721 raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url
00722
00723 if debug_http:
00724 rpki.log.debug("Contacting %s" % url)
00725
00726 request = http_request(
00727 cmd = "POST",
00728 path = u.path,
00729 body = msg,
00730 callback = callback,
00731 errback = errback,
00732 Host = u.hostname,
00733 Content_Type = rpki_content_type)
00734
00735 hostport = (u.hostname or "localhost", u.port or 80)
00736
00737 if debug_http:
00738 rpki.log.debug("Created request %r for %r" % (request, hostport))
00739 if not isinstance(server_ta, (tuple, list)):
00740 server_ta = (server_ta,)
00741 if hostport not in client_queues:
00742 client_queues[hostport] = http_queue(hostport, cert = client_cert, key = client_key, ta = server_ta)
00743 client_queues[hostport].request(request)
00744
00745
00746
00747
00748 if debug_http:
00749 rpki.log.debug("Scheduling connection startup for %r" % request)
00750 rpki.async.defer(client_queues[hostport].restart)
00751
00752 def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dynamic_https_trust_anchor = None):
00753 """
00754 Run an HTTPS server and wait (forever) for connections.
00755 """
00756
00757 if not isinstance(handlers, (tuple, list)):
00758 handlers = (("/", handlers),)
00759
00760 if not isinstance(client_ta, (tuple, list)):
00761 server_ta = (client_ta,)
00762
00763 http_listener(port = port, handlers = handlers, cert = server_cert, key = server_key, ta = client_ta, dynamic_ta = dynamic_https_trust_anchor)
00764 rpki.async.event_loop()
00765
00766 def build_https_ta_cache(certs):
00767 """
00768 Package up a collection of certificates into a form suitable for use
00769 as a dynamic HTTPS trust anchor set. Precise format of this
00770 collection is an internal conspiracy within the rpki.https module;
00771 at one point it was a POW.X509Store object, at the moment it's a
00772 Python set, what it will be tomorow is nobody else's business.
00773 """
00774
00775 return set(certs)