RPKI Engine 1.0
|
00001 """ 00002 HTTP utilities, both client and server. 00003 00004 $Id: http.py 3793 2011-04-27 04:34:52Z sra $ 00005 00006 Copyright (C) 2009-2011 Internet Systems Consortium ("ISC") 00007 00008 Permission to use, copy, modify, and distribute this software for any 00009 purpose with or without fee is hereby granted, provided that the above 00010 copyright notice and this permission notice appear in all copies. 00011 00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH 00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, 00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 00018 PERFORMANCE OF THIS SOFTWARE. 00019 00020 Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") 00021 00022 Permission to use, copy, modify, and distribute this software for any 00023 purpose with or without fee is hereby granted, provided that the above 00024 copyright notice and this permission notice appear in all copies. 00025 00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH 00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 00028 AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, 00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 00032 PERFORMANCE OF THIS SOFTWARE. 00033 """ 00034 00035 import time, socket, asyncore, asynchat, urlparse, sys, random 00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log 00037 import rpki.POW 00038 00039 ## @var rpki_content_type 00040 # HTTP content type used for all RPKI messages. 00041 rpki_content_type = "application/x-rpki" 00042 00043 ## @var debug_http 00044 # Verbose chatter about HTTP streams. 00045 debug_http = False 00046 00047 ## @var want_persistent_client 00048 # Whether we want persistent HTTP client streams, when server also supports them. 00049 want_persistent_client = False 00050 00051 ## @var want_persistent_server 00052 # Whether we want persistent HTTP server streams, when client also supports them. 00053 want_persistent_server = False 00054 00055 ## @var default_client_timeout 00056 # Default HTTP client connection timeout. 00057 default_client_timeout = rpki.sundial.timedelta(minutes = 15) 00058 00059 ## @var default_server_timeout 00060 # Default HTTP server connection timeouts. Given our druthers, we'd 00061 # prefer that the client close the connection, as this avoids the 00062 # problem of client starting to reuse connection just as server closes 00063 # it, so this should be longer than the client timeout. 00064 default_server_timeout = rpki.sundial.timedelta(minutes = 20) 00065 00066 ## @var default_http_version 00067 # Preferred HTTP version. 00068 default_http_version = (1, 0) 00069 00070 ## @var default_tcp_port 00071 # Default port for clients and servers that don't specify one. 00072 default_tcp_port = 80 00073 00074 ## @var enable_ipv6_servers 00075 # Whether to enable IPv6 listeners. Enabled by default, as it should 00076 # be harmless. Has no effect if kernel doesn't support IPv6. 00077 enable_ipv6_servers = True 00078 00079 ## @var enable_ipv6_clients 00080 # Whether to consider IPv6 addresses when making connections. 00081 # Disabled by default, as IPv6 connectivity is still a bad joke in 00082 # far too much of the world. 00083 enable_ipv6_clients = False 00084 00085 ## @var use_adns 00086 # Whether to use rpki.adns code. This is still experimental, so it's 00087 # not (yet) enabled by default. 00088 use_adns = False 00089 00090 ## @var have_ipv6 00091 # Whether the current machine claims to support IPv6. Note that just 00092 # because the kernel supports it doesn't mean that the machine has 00093 # usable IPv6 connectivity. I don't know of a simple portable way to 00094 # probe for connectivity at runtime (the old test of "can you ping 00095 # SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set 00096 # automatically by probing using the socket() system call at runtime. 00097 try: 00098 socket.socket(socket.AF_INET6).close() 00099 socket.IPPROTO_IPV6 00100 socket.IPV6_V6ONLY 00101 except: 00102 have_ipv6 = False 00103 else: 00104 have_ipv6 = True 00105 00106 def supported_address_families(enable_ipv6): 00107 """ 00108 IP address families on which servers should listen, and to consider 00109 when selecting addresses for client connections. 00110 """ 00111 if enable_ipv6 and have_ipv6: 00112 return (socket.AF_INET, socket.AF_INET6) 00113 else: 00114 return (socket.AF_INET,) 00115 00116 def localhost_addrinfo(): 00117 """ 00118 Return pseudo-getaddrinfo results for localhost. 00119 """ 00120 result = [(socket.AF_INET, "127.0.0.1")] 00121 if enable_ipv6_clients and have_ipv6: 00122 result.append((socket.AF_INET6, "::1")) 00123 return result 00124 00125 class http_message(object): 00126 """ 00127 Virtual class representing of one HTTP message. 00128 """ 00129 00130 software_name = "ISC RPKI library" 00131 00132 def __init__(self, version = None, body = None, headers = None): 00133 self.version = version 00134 self.body = body 00135 self.headers = headers 00136 self.normalize_headers() 00137 00138 def normalize_headers(self, headers = None): 00139 """ 00140 Clean up (some of) the horrible messes that HTTP allows in its 00141 headers. 00142 """ 00143 if headers is None: 00144 headers = () if self.headers is None else self.headers.items() 00145 translate_underscore = True 00146 else: 00147 translate_underscore = False 00148 result = {} 00149 for k, v in headers: 00150 if translate_underscore: 00151 k = k.replace("_", "-") 00152 k = "-".join(s.capitalize() for s in k.split("-")) 00153 v = v.strip() 00154 if k in result: 00155 result[k] += ", " + v 00156 else: 00157 result[k] = v 00158 self.headers = result 00159 00160 @classmethod 00161 def parse_from_wire(cls, headers): 00162 """ 00163 Parse and normalize an incoming HTTP message. 00164 """ 00165 self = cls() 00166 headers = headers.split("\r\n") 00167 self.parse_first_line(*headers.pop(0).split(None, 2)) 00168 for i in xrange(len(headers) - 2, -1, -1): 00169 if headers[i + 1][0].isspace(): 00170 headers[i] += headers[i + 1] 00171 del headers[i + 1] 00172 self.normalize_headers([h.split(":", 1) for h in headers]) 00173 return self 00174 00175 def format(self): 00176 """ 00177 Format an outgoing HTTP message. 00178 """ 00179 s = self.format_first_line() 00180 if self.body is not None: 00181 assert isinstance(self.body, str) 00182 self.headers["Content-Length"] = len(self.body) 00183 for kv in self.headers.iteritems(): 00184 s += "%s: %s\r\n" % kv 00185 s += "\r\n" 00186 if self.body is not None: 00187 s += self.body 00188 return s 00189 00190 def __str__(self): 00191 return self.format() 00192 00193 def parse_version(self, version): 00194 """ 00195 Parse HTTP version, raise an exception if we can't. 00196 """ 00197 if version[:5] != "HTTP/": 00198 raise rpki.exceptions.HTTPBadVersion, "Couldn't parse version %s" % version 00199 self.version = tuple(int(i) for i in version[5:].split(".")) 00200 00201 @property 00202 def persistent(self): 00203 """ 00204 Figure out whether this HTTP message encourages a persistent connection. 00205 """ 00206 c = self.headers.get("Connection") 00207 if self.version == (1, 1): 00208 return c is None or "close" not in c.lower() 00209 elif self.version == (1, 0): 00210 return c is not None and "keep-alive" in c.lower() 00211 else: 00212 return False 00213 00214 class http_request(http_message): 00215 """ 00216 HTTP request message. 00217 """ 00218 00219 def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers): 00220 assert cmd == "POST" or body is None 00221 http_message.__init__(self, version = version, body = body, headers = headers) 00222 self.cmd = cmd 00223 self.path = path 00224 self.callback = callback 00225 self.errback = errback 00226 self.retried = False 00227 00228 def parse_first_line(self, cmd, path, version): 00229 """ 00230 Parse first line of HTTP request message. 00231 """ 00232 self.parse_version(version) 00233 self.cmd = cmd 00234 self.path = path 00235 00236 def format_first_line(self): 00237 """ 00238 Format first line of HTTP request message, and set up the 00239 User-Agent header. 00240 """ 00241 self.headers.setdefault("User-Agent", self.software_name) 00242 return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) 00243 00244 def __repr__(self): 00245 return rpki.log.log_repr(self, self.cmd, self.path) 00246 00247 class http_response(http_message): 00248 """ 00249 HTTP response message. 00250 """ 00251 00252 def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers): 00253 http_message.__init__(self, version = version, body = body, headers = headers) 00254 self.code = code 00255 self.reason = reason 00256 00257 def parse_first_line(self, version, code, reason): 00258 """ 00259 Parse first line of HTTP response message. 00260 """ 00261 self.parse_version(version) 00262 self.code = int(code) 00263 self.reason = reason 00264 00265 def format_first_line(self): 00266 """ 00267 Format first line of HTTP response message, and set up Date and 00268 Server headers. 00269 """ 00270 self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) 00271 self.headers.setdefault("Server", self.software_name) 00272 return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) 00273 00274 def __repr__(self): 00275 return rpki.log.log_repr(self, self.code, self.reason) 00276 00277 def log_method(self, msg, logger = rpki.log.debug): 00278 """ 00279 Logging method used in several different classes. 00280 """ 00281 assert isinstance(logger, rpki.log.logger) 00282 if debug_http or logger is not rpki.log.debug: 00283 logger("%r: %s" % (self, msg)) 00284 00285 def addr_to_string(addr): 00286 """ 00287 Convert socket addr tuple to printable string. Assumes 2-element 00288 tuple is IPv4, 4-element tuple is IPv6, throws TypeError for 00289 anything else. 00290 """ 00291 00292 if len(addr) == 2: 00293 return "%s:%d" % (addr[0], addr[1]) 00294 if len(addr) == 4: 00295 return "%s.%d" % (addr[0], addr[1]) 00296 raise TypeError 00297 00298 class http_stream(asynchat.async_chat): 00299 """ 00300 Virtual class representing an HTTP message stream. 00301 """ 00302 00303 log = log_method 00304 show_tracebacks = False 00305 00306 def __repr__(self): 00307 status = ["connected"] if self.connected else [] 00308 try: 00309 status.append(addr_to_string(self.addr)) 00310 except TypeError: 00311 pass 00312 return rpki.log.log_repr(self, *status) 00313 00314 def __init__(self, sock = None): 00315 asynchat.async_chat.__init__(self, sock) 00316 self.buffer = [] 00317 self.timer = rpki.async.timer(self.handle_timeout) 00318 self.restart() 00319 00320 def restart(self): 00321 """ 00322 (Re)start HTTP message parser, reset timer. 00323 """ 00324 assert not self.buffer 00325 self.chunk_handler = None 00326 self.set_terminator("\r\n\r\n") 00327 self.update_timeout() 00328 00329 def update_timeout(self): 00330 """ 00331 Put this stream's timer in known good state: set it to the 00332 stream's timeout value if we're doing timeouts, otherwise clear 00333 it. 00334 """ 00335 if self.timeout is not None: 00336 self.log("Setting timeout %s" % self.timeout) 00337 self.timer.set(self.timeout) 00338 else: 00339 self.log("Clearing timeout") 00340 self.timer.cancel() 00341 00342 def collect_incoming_data(self, data): 00343 """ 00344 Buffer incoming data from asynchat. 00345 """ 00346 self.buffer.append(data) 00347 self.update_timeout() 00348 00349 def get_buffer(self): 00350 """ 00351 Consume data buffered from asynchat. 00352 """ 00353 val = "".join(self.buffer) 00354 self.buffer = [] 00355 return val 00356 00357 def found_terminator(self): 00358 """ 00359 Asynchat reported that it found whatever terminator we set, so 00360 figure out what to do next. This can be messy, because we can be 00361 in any of several different states: 00362 00363 @li We might be handling chunked HTTP, in which case we have to 00364 initialize the chunk decoder; 00365 00366 @li We might have found the end of the message body, in which case 00367 we can (finally) process it; or 00368 00369 @li We might have just gotten to the end of the message headers, 00370 in which case we have to parse them to figure out which of three 00371 separate mechanisms (chunked, content-length, TCP close) is going 00372 to tell us how to find the end of the message body. 00373 """ 00374 self.update_timeout() 00375 if self.chunk_handler: 00376 self.chunk_handler() 00377 elif not isinstance(self.get_terminator(), str): 00378 self.handle_body() 00379 else: 00380 self.msg = self.parse_type.parse_from_wire(self.get_buffer()) 00381 if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower(): 00382 self.msg.body = [] 00383 self.chunk_handler = self.chunk_header 00384 self.set_terminator("\r\n") 00385 elif "Content-Length" in self.msg.headers: 00386 self.set_terminator(int(self.msg.headers["Content-Length"])) 00387 else: 00388 self.handle_no_content_length() 00389 00390 def chunk_header(self): 00391 """ 00392 Asynchat just handed us what should be the header of one chunk of 00393 a chunked encoding stream. If this chunk has a body, set the 00394 stream up to read it; otherwise, this is the last chunk, so start 00395 the process of exiting the chunk decoder. 00396 """ 00397 n = int(self.get_buffer().partition(";")[0], 16) 00398 self.log("Chunk length %s" % n) 00399 if n: 00400 self.chunk_handler = self.chunk_body 00401 self.set_terminator(n) 00402 else: 00403 self.msg.body = "".join(self.msg.body) 00404 self.chunk_handler = self.chunk_discard_trailer 00405 00406 def chunk_body(self): 00407 """ 00408 Asynchat just handed us what should be the body of a chunk of the 00409 body of a chunked message (sic). Save it, and prepare to move on 00410 to the next chunk. 00411 """ 00412 self.log("Chunk body") 00413 self.msg.body += self.buffer 00414 self.buffer = [] 00415 self.chunk_handler = self.chunk_discard_crlf 00416 self.set_terminator("\r\n") 00417 00418 def chunk_discard_crlf(self): 00419 """ 00420 Consume the CRLF that terminates a chunk, reinitialize chunk 00421 decoder to be ready for the next chunk. 00422 """ 00423 self.log("Chunk CRLF") 00424 s = self.get_buffer() 00425 assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) 00426 self.chunk_handler = self.chunk_header 00427 00428 def chunk_discard_trailer(self): 00429 """ 00430 Consume chunk trailer, which should be empty, then (finally!) exit 00431 the chunk decoder and hand complete message off to the application. 00432 """ 00433 self.log("Chunk trailer") 00434 s = self.get_buffer() 00435 assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) 00436 self.chunk_handler = None 00437 self.handle_message() 00438 00439 def handle_body(self): 00440 """ 00441 Hand normal (not chunked) message off to the application. 00442 """ 00443 self.msg.body = self.get_buffer() 00444 self.handle_message() 00445 00446 def handle_error(self): 00447 """ 00448 Asynchat (or asyncore, or somebody) raised an exception. See 00449 whether it's one we should just pass along, otherwise log a stack 00450 trace and close the stream. 00451 """ 00452 self.timer.cancel() 00453 etype = sys.exc_info()[0] 00454 if etype in (SystemExit, rpki.async.ExitNow): 00455 raise 00456 if self.show_tracebacks: 00457 self.log("Error in HTTP stream handler", rpki.log.warn) 00458 rpki.log.traceback() 00459 if etype is not rpki.exceptions.HTTPClientAborted: 00460 self.log("Closing due to error", rpki.log.warn) 00461 self.close() 00462 00463 def handle_timeout(self): 00464 """ 00465 Inactivity timer expired, close connection with prejudice. 00466 """ 00467 self.log("Timeout, closing") 00468 self.close() 00469 00470 def handle_close(self): 00471 """ 00472 Wrapper around asynchat connection close handler, so that we can 00473 log the event, cancel timer, and so forth. 00474 """ 00475 self.log("Close event in HTTP stream handler") 00476 self.timer.cancel() 00477 asynchat.async_chat.handle_close(self) 00478 00479 class http_server(http_stream): 00480 """ 00481 HTTP server stream. 00482 """ 00483 00484 ## @var parse_type 00485 # Stream parser should look for incoming HTTP request messages. 00486 parse_type = http_request 00487 00488 ## @var timeout 00489 # Use the default server timeout value set in the module header. 00490 timeout = default_server_timeout 00491 00492 def __init__(self, sock, handlers): 00493 self.handlers = handlers 00494 http_stream.__init__(self, sock = sock) 00495 self.expect_close = not want_persistent_server 00496 self.log("Starting") 00497 00498 def handle_no_content_length(self): 00499 """ 00500 Handle an incoming message that used neither chunking nor a 00501 Content-Length header (that is: this message will be the last one 00502 in this server stream). No special action required. 00503 """ 00504 self.handle_message() 00505 00506 def find_handler(self, path): 00507 """ 00508 Helper method to search self.handlers. 00509 """ 00510 for s, h in self.handlers: 00511 if path.startswith(s): 00512 return h 00513 return None 00514 00515 def handle_message(self): 00516 """ 00517 HTTP layer managed to deliver a complete HTTP request to 00518 us, figure out what to do with it. Check the command and 00519 Content-Type, look for a handler, and if everything looks right, 00520 pass the message body, path, and a reply callback to the handler. 00521 """ 00522 self.log("Received request %r" % self.msg) 00523 if not self.msg.persistent: 00524 self.expect_close = True 00525 handler = self.find_handler(self.msg.path) 00526 error = None 00527 if self.msg.cmd != "POST": 00528 error = 501, "No handler for method %s" % self.msg.cmd 00529 elif self.msg.headers["Content-Type"] != rpki_content_type: 00530 error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"] 00531 elif handler is None: 00532 error = 404, "No handler for URL %s" % self.msg.path 00533 if error is None: 00534 try: 00535 handler(self.msg.body, self.msg.path, self.send_reply) 00536 except (rpki.async.ExitNow, SystemExit): 00537 raise 00538 except Exception, e: 00539 if self.show_tracebacks: 00540 rpki.log.traceback() 00541 self.send_error(500, "Unhandled exception %s" % e) 00542 else: 00543 self.send_error(code = error[0], reason = error[1]) 00544 00545 def send_error(self, code, reason): 00546 """ 00547 Send an error response to this request. 00548 """ 00549 self.send_message(code = code, reason = reason) 00550 00551 def send_reply(self, code, body = None, reason = "OK"): 00552 """ 00553 Send a reply to this request. 00554 """ 00555 self.send_message(code = code, body = body, reason = reason) 00556 00557 def send_message(self, code, reason = "OK", body = None): 00558 """ 00559 Queue up reply message. If both parties agree that connection is 00560 persistant, and if no error occurred, restart this stream to 00561 listen for next message; otherwise, queue up a close event for 00562 this stream so it will shut down once the reply has been sent. 00563 """ 00564 self.log("Sending response %s %s" % (code, reason)) 00565 if code >= 400: 00566 self.expect_close = True 00567 msg = http_response(code = code, reason = reason, body = body, 00568 Content_Type = rpki_content_type, 00569 Connection = "Close" if self.expect_close else "Keep-Alive") 00570 self.push(msg.format()) 00571 if self.expect_close: 00572 self.log("Closing") 00573 self.timer.cancel() 00574 self.close_when_done() 00575 else: 00576 self.log("Listening for next message") 00577 self.restart() 00578 00579 class http_listener(asyncore.dispatcher): 00580 """ 00581 Listener for incoming HTTP connections. 00582 """ 00583 00584 log = log_method 00585 show_tracebacks = False 00586 00587 def __repr__(self): 00588 try: 00589 status = (addr_to_string(self.addr),) 00590 except TypeError: 00591 status = () 00592 return rpki.log.log_repr(self, *status) 00593 00594 def __init__(self, handlers, addrinfo): 00595 asyncore.dispatcher.__init__(self) 00596 self.handlers = handlers 00597 try: 00598 af, socktype, proto, canonname, sockaddr = addrinfo 00599 self.create_socket(af, socktype) 00600 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 00601 try: 00602 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 00603 except AttributeError: 00604 pass 00605 if have_ipv6 and af == socket.AF_INET6: 00606 self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) 00607 self.bind(sockaddr) 00608 self.listen(5) 00609 except: 00610 self.log("Couldn't set up HTTP listener", rpki.log.warn) 00611 if self.show_tracebacks: 00612 rpki.log.traceback() 00613 self.close() 00614 for h in handlers: 00615 self.log("Handling %s" % h[0]) 00616 00617 def handle_accept(self): 00618 """ 00619 Asyncore says we have an incoming connection, spawn an http_server 00620 stream for it and pass along all of our handler data. 00621 """ 00622 try: 00623 s, client = self.accept() 00624 self.log("Accepting connection from %s" % addr_to_string(client)) 00625 http_server(sock = s, handlers = self.handlers) 00626 except (rpki.async.ExitNow, SystemExit): 00627 raise 00628 except: 00629 self.log("Unable to accept connection") 00630 self.handle_error() 00631 00632 def handle_error(self): 00633 """ 00634 Asyncore signaled an error, pass it along or log it. 00635 """ 00636 if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): 00637 raise 00638 self.log("Error in HTTP listener", rpki.log.warn) 00639 if self.show_tracebacks: 00640 rpki.log.traceback() 00641 00642 class http_client(http_stream): 00643 """ 00644 HTTP client stream. 00645 """ 00646 00647 ## @var parse_type 00648 # Stream parser should look for incoming HTTP response messages. 00649 parse_type = http_response 00650 00651 ## @var timeout 00652 # Use the default client timeout value set in the module header. 00653 timeout = default_client_timeout 00654 00655 ## @var state 00656 # Application layer connection state. 00657 state = None 00658 00659 def __init__(self, queue, hostport): 00660 self.log("Creating new connection to %s" % addr_to_string(hostport)) 00661 http_stream.__init__(self) 00662 self.queue = queue 00663 self.host = hostport[0] 00664 self.port = hostport[1] 00665 self.set_state("opening") 00666 self.expect_close = not want_persistent_client 00667 00668 def start(self): 00669 """ 00670 Create socket and request a connection. 00671 """ 00672 if not use_adns: 00673 self.log("Not using ADNS") 00674 self.gotaddrinfo([(socket.AF_INET, self.host)]) 00675 elif self.host == "localhost": 00676 self.log("Bypassing DNS for localhost") 00677 self.gotaddrinfo(localhost_addrinfo()) 00678 else: 00679 import rpki.adns # This should move to start of file once we've decided to inflict it on all users 00680 families = supported_address_families(enable_ipv6_clients) 00681 self.log("Starting ADNS lookup for %s in families %r" % (self.host, families)) 00682 rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families) 00683 00684 def dns_error(self, e): 00685 """ 00686 Handle DNS lookup errors. For now, just whack the connection. 00687 Undoubtedly we should do something better with diagnostics here. 00688 """ 00689 self.handle_error() 00690 00691 def gotaddrinfo(self, addrinfo): 00692 """ 00693 Got address data from DNS, create socket and request connection. 00694 """ 00695 try: 00696 self.af, self.address = random.choice(addrinfo) 00697 self.log("Connecting to AF %s host %s port %s addr %s" % (self.af, self.host, self.port, self.address)) 00698 self.create_socket(self.af, socket.SOCK_STREAM) 00699 self.connect((self.address, self.port)) 00700 if self.addr is None: 00701 self.addr = (self.host, self.port) 00702 except (rpki.async.ExitNow, SystemExit): 00703 raise 00704 except: 00705 self.handle_error() 00706 00707 def handle_connect(self): 00708 """ 00709 Asyncore says socket has connected. 00710 """ 00711 self.log("Socket connected") 00712 self.set_state("idle") 00713 assert self.queue.client is self 00714 self.queue.send_request() 00715 00716 def set_state(self, state): 00717 """ 00718 Set HTTP client connection state. 00719 """ 00720 self.log("State transition %s => %s" % (self.state, state)) 00721 self.state = state 00722 00723 def handle_no_content_length(self): 00724 """ 00725 Handle response message that used neither chunking nor a 00726 Content-Length header (that is: this message will be the last one 00727 in this server stream). In this case we want to read until we 00728 reach the end of the data stream. 00729 """ 00730 self.set_terminator(None) 00731 00732 def send_request(self, msg): 00733 """ 00734 Queue up request message and kickstart connection. 00735 """ 00736 self.log("Sending request %r" % msg) 00737 assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) 00738 self.set_state("request-sent") 00739 msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" 00740 self.push(msg.format()) 00741 self.restart() 00742 00743 def handle_message(self): 00744 """ 00745 Handle incoming HTTP response message. Make sure we're in a state 00746 where we expect to see such a message (and allow the mysterious 00747 empty messages that Apache sends during connection close, no idea 00748 what that is supposed to be about). If everybody agrees that the 00749 connection should stay open, put it into an idle state; otherwise, 00750 arrange for the stream to shut down. 00751 """ 00752 00753 self.log("Message received, state %s" % self.state) 00754 00755 if not self.msg.persistent: 00756 self.expect_close = True 00757 00758 if self.state != "request-sent": 00759 if self.state == "closing": 00760 assert not self.msg.body 00761 self.log("Ignoring empty response received while closing") 00762 return 00763 raise rpki.exceptions.HTTPUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state) 00764 00765 if self.expect_close: 00766 self.log("Closing") 00767 self.set_state("closing") 00768 self.close_when_done() 00769 else: 00770 self.log("Idling") 00771 self.set_state("idle") 00772 self.update_timeout() 00773 00774 if self.msg.code != 200: 00775 raise rpki.exceptions.HTTPRequestFailed, "HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body) 00776 self.queue.return_result(self, self.msg, detach = self.expect_close) 00777 00778 def handle_close(self): 00779 """ 00780 Asyncore signaled connection close. If we were waiting for that 00781 to find the end of a response message, process the resulting 00782 message now; if we were waiting for the response to a request we 00783 sent, signal the error. 00784 """ 00785 http_stream.handle_close(self) 00786 self.log("State %s" % self.state) 00787 if self.get_terminator() is None: 00788 self.handle_body() 00789 elif self.state == "request-sent": 00790 raise rpki.exceptions.HTTPClientAborted, "HTTP request aborted by close event" 00791 else: 00792 self.queue.detach(self) 00793 00794 def handle_timeout(self): 00795 """ 00796 Connection idle timer has expired. Shut down connection in any 00797 case, noisily if we weren't idle. 00798 """ 00799 bad = self.state not in ("idle", "closing") 00800 if bad: 00801 self.log("Timeout while in state %s" % self.state, rpki.log.warn) 00802 http_stream.handle_timeout(self) 00803 if bad: 00804 raise rpki.exceptions.HTTPTimeout 00805 else: 00806 self.queue.detach(self) 00807 00808 def handle_error(self): 00809 """ 00810 Asyncore says something threw an exception. Log it, then shut 00811 down the connection and pass back the exception. 00812 """ 00813 eclass, edata = sys.exc_info()[0:2] 00814 self.log("Error on HTTP client connection %s:%s %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn) 00815 http_stream.handle_error(self) 00816 self.queue.return_result(self, edata, detach = True) 00817 00818 class http_queue(object): 00819 """ 00820 Queue of pending HTTP requests for a single destination. This class 00821 is very tightly coupled to http_client; http_client handles the HTTP 00822 stream itself, this class provides a slightly higher-level API. 00823 """ 00824 00825 log = log_method 00826 00827 def __repr__(self): 00828 return rpki.log.log_repr(self, "%s" % addr_to_string(self.hostport)) 00829 00830 def __init__(self, hostport): 00831 self.hostport = hostport 00832 self.client = None 00833 self.log("Created") 00834 self.queue = [] 00835 00836 def request(self, *requests): 00837 """ 00838 Append http_request object(s) to this queue. 00839 """ 00840 self.log("Adding requests %r" % requests) 00841 self.queue.extend(requests) 00842 00843 def restart(self): 00844 """ 00845 Send next request for this queue, if we can. This may involve 00846 starting a new http_client stream, reusing an existing idle 00847 stream, or just ignoring this request if there's an active client 00848 stream already; in the last case, handling of the response (or 00849 exception, or timeout) for the query currently in progress will 00850 call this method when it's time to kick out the next query. 00851 """ 00852 try: 00853 if self.client is None: 00854 self.client = http_client(self, self.hostport) 00855 self.log("Attached client %r" % self.client) 00856 self.client.start() 00857 elif self.client.state == "idle": 00858 self.log("Sending request to existing client %r" % self.client) 00859 self.send_request() 00860 else: 00861 self.log("Client %r exists in state %r" % (self.client, self.client.state)) 00862 except (rpki.async.ExitNow, SystemExit): 00863 raise 00864 except Exception, e: 00865 self.return_result(self.client, e, detach = True) 00866 00867 def send_request(self): 00868 """ 00869 Kick out the next query in this queue, if any. 00870 """ 00871 if self.queue: 00872 self.client.send_request(self.queue[0]) 00873 00874 def detach(self, client_): 00875 """ 00876 Detatch a client from this queue. Silently ignores attempting to 00877 detach a client that is not attached to this queue, to simplify 00878 handling of what otherwise would be a nasty set of race 00879 conditions. 00880 """ 00881 if client_ is self.client: 00882 self.log("Detaching client %r" % client_) 00883 self.client = None 00884 00885 def return_result(self, client, result, detach = False): 00886 """ 00887 Client stream has returned a result, which we need to pass along 00888 to the original caller. Result may be either an HTTP response 00889 message or an exception. In either case, once we're done 00890 processing this result, kick off next message in the queue, if any. 00891 """ 00892 00893 if client is not self.client: 00894 self.log("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn) 00895 return 00896 00897 if detach: 00898 self.detach(client) 00899 00900 try: 00901 req = self.queue.pop(0) 00902 self.log("Dequeuing request %r" % req) 00903 except IndexError: 00904 self.log("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn) 00905 return 00906 00907 try: 00908 if isinstance(result, http_response): 00909 self.log("Returning result %r to caller" % result) 00910 req.callback(result.body) 00911 else: 00912 assert isinstance(result, Exception) 00913 self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn) 00914 req.errback(result) 00915 except (rpki.async.ExitNow, SystemExit): 00916 raise 00917 except: 00918 self.log("Unhandled exception from callback") 00919 rpki.log.traceback() 00920 00921 self.log("Queue: %r" % self.queue) 00922 00923 if self.queue: 00924 self.restart() 00925 00926 ## @var client_queues 00927 # Map of (host, port) tuples to http_queue objects. 00928 client_queues = {} 00929 00930 def client(msg, url, callback, errback): 00931 """ 00932 Open client HTTP connection, send a message, set up callbacks to 00933 handle response. 00934 """ 00935 00936 u = urlparse.urlparse(url) 00937 00938 if (u.scheme not in ("", "http") or 00939 u.username is not None or 00940 u.password is not None or 00941 u.params != "" or 00942 u.query != "" or 00943 u.fragment != ""): 00944 raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url 00945 00946 if debug_http: 00947 rpki.log.debug("Contacting %s" % url) 00948 00949 request = http_request( 00950 cmd = "POST", 00951 path = u.path, 00952 body = msg, 00953 callback = callback, 00954 errback = errback, 00955 Host = u.hostname, 00956 Content_Type = rpki_content_type) 00957 00958 hostport = (u.hostname or "localhost", u.port or default_tcp_port) 00959 00960 if debug_http: 00961 rpki.log.debug("Created request %r for %s" % (request, addr_to_string(hostport))) 00962 if hostport not in client_queues: 00963 client_queues[hostport] = http_queue(hostport) 00964 client_queues[hostport].request(request) 00965 00966 # Defer connection attempt until after we've had time to process any 00967 # pending I/O events, in case connections have closed. 00968 00969 if debug_http: 00970 rpki.log.debug("Scheduling connection startup for %r" % request) 00971 rpki.async.defer(client_queues[hostport].restart) 00972 00973 def server(handlers, port, host = ""): 00974 """ 00975 Run an HTTP server and wait (forever) for connections. 00976 """ 00977 00978 if not isinstance(handlers, (tuple, list)): 00979 handlers = (("/", handlers),) 00980 00981 # Yes, this is sick. So is getaddrinfo() returning duplicate 00982 # records, which RedHat has the gall to claim is a feature. 00983 ai = [] 00984 for af in supported_address_families(enable_ipv6_servers): 00985 try: 00986 if host: 00987 h = host 00988 elif have_ipv6 and af == socket.AF_INET6: 00989 h = "::" 00990 else: 00991 h = "0.0.0.0" 00992 for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM): 00993 if a not in ai: 00994 ai.append(a) 00995 except socket.gaierror: 00996 pass 00997 00998 for a in ai: 00999 http_listener(addrinfo = a, handlers = handlers) 01000 01001 rpki.async.event_loop() 01002 01003 class caller(object): 01004 """ 01005 Handle client-side mechanics for protocols based on HTTP, CMS, and 01006 rpki.xml_utils. Calling sequence is intended to nest within 01007 rpki.async.sync_wrapper. 01008 """ 01009 01010 debug = False 01011 01012 def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None): 01013 self.proto = proto 01014 self.client_key = client_key 01015 self.client_cert = client_cert 01016 self.server_ta = server_ta 01017 self.server_cert = server_cert 01018 self.url = url 01019 if debug is not None: 01020 self.debug = debug 01021 01022 def __call__(self, cb, eb, *pdus): 01023 01024 def done(r_der): 01025 """ 01026 Handle CMS-wrapped XML response message. 01027 """ 01028 r_cms = self.proto.cms_msg(DER = r_der) 01029 r_msg = r_cms.unwrap((self.server_ta, self.server_cert)) 01030 if self.debug: 01031 print "<!-- Reply -->" 01032 print r_cms.pretty_print_content() 01033 cb(r_msg) 01034 01035 q_msg = self.proto.msg.query(*pdus) 01036 q_cms = self.proto.cms_msg() 01037 q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert) 01038 if self.debug: 01039 print "<!-- Query -->" 01040 print q_cms.pretty_print_content() 01041 01042 client(url = self.url, msg = q_der, callback = done, errback = eb)