RPKI Engine
1.0
|
00001 """ 00002 HTTP utilities, both client and server. 00003 00004 $Id: http.py 4026 2011-10-07 21:43:47Z sra $ 00005 00006 Copyright (C) 2009-2011 Internet Systems Consortium ("ISC") 00007 00008 Permission to use, copy, modify, and distribute this software for any 00009 purpose with or without fee is hereby granted, provided that the above 00010 copyright notice and this permission notice appear in all copies. 00011 00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH 00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, 00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 00018 PERFORMANCE OF THIS SOFTWARE. 00019 00020 Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") 00021 00022 Permission to use, copy, modify, and distribute this software for any 00023 purpose with or without fee is hereby granted, provided that the above 00024 copyright notice and this permission notice appear in all copies. 00025 00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH 00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 00028 AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, 00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 00032 PERFORMANCE OF THIS SOFTWARE. 00033 """ 00034 00035 import time, socket, asyncore, asynchat, urlparse, sys, random 00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log 00037 import rpki.POW 00038 00039 ## @var rpki_content_type 00040 # HTTP content type used for all RPKI messages. 00041 rpki_content_type = "application/x-rpki" 00042 00043 ## @var debug_http 00044 # Verbose chatter about HTTP streams. 00045 debug_http = False 00046 00047 ## @var want_persistent_client 00048 # Whether we want persistent HTTP client streams, when server also supports them. 00049 want_persistent_client = False 00050 00051 ## @var want_persistent_server 00052 # Whether we want persistent HTTP server streams, when client also supports them. 00053 want_persistent_server = False 00054 00055 ## @var default_client_timeout 00056 # Default HTTP client connection timeout. 00057 default_client_timeout = rpki.sundial.timedelta(minutes = 5) 00058 00059 ## @var default_server_timeout 00060 # Default HTTP server connection timeouts. Given our druthers, we'd 00061 # prefer that the client close the connection, as this avoids the 00062 # problem of client starting to reuse connection just as server closes 00063 # it, so this should be longer than the client timeout. 00064 default_server_timeout = rpki.sundial.timedelta(minutes = 10) 00065 00066 ## @var default_http_version 00067 # Preferred HTTP version. 00068 default_http_version = (1, 0) 00069 00070 ## @var default_tcp_port 00071 # Default port for clients and servers that don't specify one. 00072 default_tcp_port = 80 00073 00074 ## @var enable_ipv6_servers 00075 # Whether to enable IPv6 listeners. Enabled by default, as it should 00076 # be harmless. Has no effect if kernel doesn't support IPv6. 00077 enable_ipv6_servers = True 00078 00079 ## @var enable_ipv6_clients 00080 # Whether to consider IPv6 addresses when making connections. 00081 # Disabled by default, as IPv6 connectivity is still a bad joke in 00082 # far too much of the world. 00083 enable_ipv6_clients = False 00084 00085 ## @var use_adns 00086 # Whether to use rpki.adns code. This is still experimental, so it's 00087 # not (yet) enabled by default. 00088 use_adns = False 00089 00090 ## @var have_ipv6 00091 # Whether the current machine claims to support IPv6. Note that just 00092 # because the kernel supports it doesn't mean that the machine has 00093 # usable IPv6 connectivity. I don't know of a simple portable way to 00094 # probe for connectivity at runtime (the old test of "can you ping 00095 # SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set 00096 # automatically by probing using the socket() system call at runtime. 00097 try: 00098 socket.socket(socket.AF_INET6).close() 00099 socket.IPPROTO_IPV6 00100 socket.IPV6_V6ONLY 00101 except: 00102 have_ipv6 = False 00103 else: 00104 have_ipv6 = True 00105 00106 def supported_address_families(enable_ipv6): 00107 """ 00108 IP address families on which servers should listen, and to consider 00109 when selecting addresses for client connections. 00110 """ 00111 if enable_ipv6 and have_ipv6: 00112 return (socket.AF_INET, socket.AF_INET6) 00113 else: 00114 return (socket.AF_INET,) 00115 00116 def localhost_addrinfo(): 00117 """ 00118 Return pseudo-getaddrinfo results for localhost. 00119 """ 00120 result = [(socket.AF_INET, "127.0.0.1")] 00121 if enable_ipv6_clients and have_ipv6: 00122 result.append((socket.AF_INET6, "::1")) 00123 return result 00124 00125 class http_message(object): 00126 """ 00127 Virtual class representing of one HTTP message. 00128 """ 00129 00130 software_name = "ISC RPKI library" 00131 00132 def __init__(self, version = None, body = None, headers = None): 00133 self.version = version 00134 self.body = body 00135 self.headers = headers 00136 self.normalize_headers() 00137 00138 def normalize_headers(self, headers = None): 00139 """ 00140 Clean up (some of) the horrible messes that HTTP allows in its 00141 headers. 00142 """ 00143 if headers is None: 00144 headers = () if self.headers is None else self.headers.items() 00145 translate_underscore = True 00146 else: 00147 translate_underscore = False 00148 result = {} 00149 for k, v in headers: 00150 if translate_underscore: 00151 k = k.replace("_", "-") 00152 k = "-".join(s.capitalize() for s in k.split("-")) 00153 v = v.strip() 00154 if k in result: 00155 result[k] += ", " + v 00156 else: 00157 result[k] = v 00158 self.headers = result 00159 00160 @classmethod 00161 def parse_from_wire(cls, headers): 00162 """ 00163 Parse and normalize an incoming HTTP message. 00164 """ 00165 self = cls() 00166 headers = headers.split("\r\n") 00167 self.parse_first_line(*headers.pop(0).split(None, 2)) 00168 for i in xrange(len(headers) - 2, -1, -1): 00169 if headers[i + 1][0].isspace(): 00170 headers[i] += headers[i + 1] 00171 del headers[i + 1] 00172 self.normalize_headers([h.split(":", 1) for h in headers]) 00173 return self 00174 00175 def format(self): 00176 """ 00177 Format an outgoing HTTP message. 00178 """ 00179 s = self.format_first_line() 00180 if self.body is not None: 00181 assert isinstance(self.body, str) 00182 self.headers["Content-Length"] = len(self.body) 00183 for kv in self.headers.iteritems(): 00184 s += "%s: %s\r\n" % kv 00185 s += "\r\n" 00186 if self.body is not None: 00187 s += self.body 00188 return s 00189 00190 def __str__(self): 00191 return self.format() 00192 00193 def parse_version(self, version): 00194 """ 00195 Parse HTTP version, raise an exception if we can't. 00196 """ 00197 if version[:5] != "HTTP/": 00198 raise rpki.exceptions.HTTPBadVersion, "Couldn't parse version %s" % version 00199 self.version = tuple(int(i) for i in version[5:].split(".")) 00200 00201 @property 00202 def persistent(self): 00203 """ 00204 Figure out whether this HTTP message encourages a persistent connection. 00205 """ 00206 c = self.headers.get("Connection") 00207 if self.version == (1, 1): 00208 return c is None or "close" not in c.lower() 00209 elif self.version == (1, 0): 00210 return c is not None and "keep-alive" in c.lower() 00211 else: 00212 return False 00213 00214 class http_request(http_message): 00215 """ 00216 HTTP request message. 00217 """ 00218 00219 def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers): 00220 assert cmd == "POST" or body is None 00221 http_message.__init__(self, version = version, body = body, headers = headers) 00222 self.cmd = cmd 00223 self.path = path 00224 self.callback = callback 00225 self.errback = errback 00226 self.retried = False 00227 00228 def parse_first_line(self, cmd, path, version): 00229 """ 00230 Parse first line of HTTP request message. 00231 """ 00232 self.parse_version(version) 00233 self.cmd = cmd 00234 self.path = path 00235 00236 def format_first_line(self): 00237 """ 00238 Format first line of HTTP request message, and set up the 00239 User-Agent header. 00240 """ 00241 self.headers.setdefault("User-Agent", self.software_name) 00242 return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) 00243 00244 def __repr__(self): 00245 return rpki.log.log_repr(self, self.cmd, self.path) 00246 00247 class http_response(http_message): 00248 """ 00249 HTTP response message. 00250 """ 00251 00252 def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers): 00253 http_message.__init__(self, version = version, body = body, headers = headers) 00254 self.code = code 00255 self.reason = reason 00256 00257 def parse_first_line(self, version, code, reason): 00258 """ 00259 Parse first line of HTTP response message. 00260 """ 00261 self.parse_version(version) 00262 self.code = int(code) 00263 self.reason = reason 00264 00265 def format_first_line(self): 00266 """ 00267 Format first line of HTTP response message, and set up Date and 00268 Server headers. 00269 """ 00270 self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) 00271 self.headers.setdefault("Server", self.software_name) 00272 return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) 00273 00274 def __repr__(self): 00275 return rpki.log.log_repr(self, self.code, self.reason) 00276 00277 def log_method(self, msg, logger = rpki.log.debug): 00278 """ 00279 Logging method used in several different classes. 00280 """ 00281 assert isinstance(logger, rpki.log.logger) 00282 if debug_http or logger is not rpki.log.debug: 00283 logger("%r: %s" % (self, msg)) 00284 00285 def addr_to_string(addr): 00286 """ 00287 Convert socket addr tuple to printable string. Assumes 2-element 00288 tuple is IPv4, 4-element tuple is IPv6, throws TypeError for 00289 anything else. 00290 """ 00291 00292 if len(addr) == 2: 00293 return "%s:%d" % (addr[0], addr[1]) 00294 if len(addr) == 4: 00295 return "%s.%d" % (addr[0], addr[1]) 00296 raise TypeError 00297 00298 class http_stream(asynchat.async_chat): 00299 """ 00300 Virtual class representing an HTTP message stream. 00301 """ 00302 00303 log = log_method 00304 00305 def __repr__(self): 00306 status = ["connected"] if self.connected else [] 00307 try: 00308 status.append(addr_to_string(self.addr)) 00309 except TypeError: 00310 pass 00311 return rpki.log.log_repr(self, *status) 00312 00313 def __init__(self, sock = None): 00314 asynchat.async_chat.__init__(self, sock) 00315 self.buffer = [] 00316 self.timer = rpki.async.timer(self.handle_timeout) 00317 self.restart() 00318 00319 def restart(self): 00320 """ 00321 (Re)start HTTP message parser, reset timer. 00322 """ 00323 assert not self.buffer 00324 self.chunk_handler = None 00325 self.set_terminator("\r\n\r\n") 00326 self.update_timeout() 00327 00328 def update_timeout(self): 00329 """ 00330 Put this stream's timer in known good state: set it to the 00331 stream's timeout value if we're doing timeouts, otherwise clear 00332 it. 00333 """ 00334 if self.timeout is not None: 00335 self.log("Setting timeout %s" % self.timeout) 00336 self.timer.set(self.timeout) 00337 else: 00338 self.log("Clearing timeout") 00339 self.timer.cancel() 00340 00341 def collect_incoming_data(self, data): 00342 """ 00343 Buffer incoming data from asynchat. 00344 """ 00345 self.buffer.append(data) 00346 self.update_timeout() 00347 00348 def get_buffer(self): 00349 """ 00350 Consume data buffered from asynchat. 00351 """ 00352 val = "".join(self.buffer) 00353 self.buffer = [] 00354 return val 00355 00356 def found_terminator(self): 00357 """ 00358 Asynchat reported that it found whatever terminator we set, so 00359 figure out what to do next. This can be messy, because we can be 00360 in any of several different states: 00361 00362 @li We might be handling chunked HTTP, in which case we have to 00363 initialize the chunk decoder; 00364 00365 @li We might have found the end of the message body, in which case 00366 we can (finally) process it; or 00367 00368 @li We might have just gotten to the end of the message headers, 00369 in which case we have to parse them to figure out which of three 00370 separate mechanisms (chunked, content-length, TCP close) is going 00371 to tell us how to find the end of the message body. 00372 """ 00373 self.update_timeout() 00374 if self.chunk_handler: 00375 self.chunk_handler() 00376 elif not isinstance(self.get_terminator(), str): 00377 self.handle_body() 00378 else: 00379 self.msg = self.parse_type.parse_from_wire(self.get_buffer()) 00380 if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower(): 00381 self.msg.body = [] 00382 self.chunk_handler = self.chunk_header 00383 self.set_terminator("\r\n") 00384 elif "Content-Length" in self.msg.headers: 00385 self.set_terminator(int(self.msg.headers["Content-Length"])) 00386 else: 00387 self.handle_no_content_length() 00388 00389 def chunk_header(self): 00390 """ 00391 Asynchat just handed us what should be the header of one chunk of 00392 a chunked encoding stream. If this chunk has a body, set the 00393 stream up to read it; otherwise, this is the last chunk, so start 00394 the process of exiting the chunk decoder. 00395 """ 00396 n = int(self.get_buffer().partition(";")[0], 16) 00397 self.log("Chunk length %s" % n) 00398 if n: 00399 self.chunk_handler = self.chunk_body 00400 self.set_terminator(n) 00401 else: 00402 self.msg.body = "".join(self.msg.body) 00403 self.chunk_handler = self.chunk_discard_trailer 00404 00405 def chunk_body(self): 00406 """ 00407 Asynchat just handed us what should be the body of a chunk of the 00408 body of a chunked message (sic). Save it, and prepare to move on 00409 to the next chunk. 00410 """ 00411 self.log("Chunk body") 00412 self.msg.body += self.buffer 00413 self.buffer = [] 00414 self.chunk_handler = self.chunk_discard_crlf 00415 self.set_terminator("\r\n") 00416 00417 def chunk_discard_crlf(self): 00418 """ 00419 Consume the CRLF that terminates a chunk, reinitialize chunk 00420 decoder to be ready for the next chunk. 00421 """ 00422 self.log("Chunk CRLF") 00423 s = self.get_buffer() 00424 assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) 00425 self.chunk_handler = self.chunk_header 00426 00427 def chunk_discard_trailer(self): 00428 """ 00429 Consume chunk trailer, which should be empty, then (finally!) exit 00430 the chunk decoder and hand complete message off to the application. 00431 """ 00432 self.log("Chunk trailer") 00433 s = self.get_buffer() 00434 assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) 00435 self.chunk_handler = None 00436 self.handle_message() 00437 00438 def handle_body(self): 00439 """ 00440 Hand normal (not chunked) message off to the application. 00441 """ 00442 self.msg.body = self.get_buffer() 00443 self.handle_message() 00444 00445 def handle_error(self): 00446 """ 00447 Asynchat (or asyncore, or somebody) raised an exception. See 00448 whether it's one we should just pass along, otherwise log a stack 00449 trace and close the stream. 00450 """ 00451 self.timer.cancel() 00452 etype = sys.exc_info()[0] 00453 if etype in (SystemExit, rpki.async.ExitNow): 00454 raise 00455 rpki.log.traceback() 00456 if etype is not rpki.exceptions.HTTPClientAborted: 00457 self.log("Closing due to error", rpki.log.warn) 00458 self.close() 00459 00460 def handle_timeout(self): 00461 """ 00462 Inactivity timer expired, close connection with prejudice. 00463 """ 00464 self.log("Timeout, closing") 00465 self.close() 00466 00467 def handle_close(self): 00468 """ 00469 Wrapper around asynchat connection close handler, so that we can 00470 log the event, cancel timer, and so forth. 00471 """ 00472 self.log("Close event in HTTP stream handler") 00473 self.timer.cancel() 00474 asynchat.async_chat.handle_close(self) 00475 00476 class http_server(http_stream): 00477 """ 00478 HTTP server stream. 00479 """ 00480 00481 ## @var parse_type 00482 # Stream parser should look for incoming HTTP request messages. 00483 parse_type = http_request 00484 00485 ## @var timeout 00486 # Use the default server timeout value set in the module header. 00487 timeout = default_server_timeout 00488 00489 def __init__(self, sock, handlers): 00490 self.handlers = handlers 00491 http_stream.__init__(self, sock = sock) 00492 self.expect_close = not want_persistent_server 00493 self.log("Starting") 00494 00495 def handle_no_content_length(self): 00496 """ 00497 Handle an incoming message that used neither chunking nor a 00498 Content-Length header (that is: this message will be the last one 00499 in this server stream). No special action required. 00500 """ 00501 self.handle_message() 00502 00503 def find_handler(self, path): 00504 """ 00505 Helper method to search self.handlers. 00506 """ 00507 for s, h in self.handlers: 00508 if path.startswith(s): 00509 return h 00510 return None 00511 00512 def handle_message(self): 00513 """ 00514 HTTP layer managed to deliver a complete HTTP request to 00515 us, figure out what to do with it. Check the command and 00516 Content-Type, look for a handler, and if everything looks right, 00517 pass the message body, path, and a reply callback to the handler. 00518 """ 00519 self.log("Received request %r" % self.msg) 00520 if not self.msg.persistent: 00521 self.expect_close = True 00522 handler = self.find_handler(self.msg.path) 00523 error = None 00524 if self.msg.cmd != "POST": 00525 error = 501, "No handler for method %s" % self.msg.cmd 00526 elif self.msg.headers["Content-Type"] != rpki_content_type: 00527 error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"] 00528 elif handler is None: 00529 error = 404, "No handler for URL %s" % self.msg.path 00530 if error is None: 00531 try: 00532 handler(self.msg.body, self.msg.path, self.send_reply) 00533 except (rpki.async.ExitNow, SystemExit): 00534 raise 00535 except Exception, e: 00536 rpki.log.traceback() 00537 self.send_error(500, "Unhandled exception %s" % e) 00538 else: 00539 self.send_error(code = error[0], reason = error[1]) 00540 00541 def send_error(self, code, reason): 00542 """ 00543 Send an error response to this request. 00544 """ 00545 self.send_message(code = code, reason = reason) 00546 00547 def send_reply(self, code, body = None, reason = "OK"): 00548 """ 00549 Send a reply to this request. 00550 """ 00551 self.send_message(code = code, body = body, reason = reason) 00552 00553 def send_message(self, code, reason = "OK", body = None): 00554 """ 00555 Queue up reply message. If both parties agree that connection is 00556 persistant, and if no error occurred, restart this stream to 00557 listen for next message; otherwise, queue up a close event for 00558 this stream so it will shut down once the reply has been sent. 00559 """ 00560 self.log("Sending response %s %s" % (code, reason)) 00561 if code >= 400: 00562 self.expect_close = True 00563 msg = http_response(code = code, reason = reason, body = body, 00564 Content_Type = rpki_content_type, 00565 Connection = "Close" if self.expect_close else "Keep-Alive") 00566 self.push(msg.format()) 00567 if self.expect_close: 00568 self.log("Closing") 00569 self.timer.cancel() 00570 self.close_when_done() 00571 else: 00572 self.log("Listening for next message") 00573 self.restart() 00574 00575 class http_listener(asyncore.dispatcher): 00576 """ 00577 Listener for incoming HTTP connections. 00578 """ 00579 00580 log = log_method 00581 00582 def __repr__(self): 00583 try: 00584 status = (addr_to_string(self.addr),) 00585 except TypeError: 00586 status = () 00587 return rpki.log.log_repr(self, *status) 00588 00589 def __init__(self, handlers, addrinfo): 00590 asyncore.dispatcher.__init__(self) 00591 self.handlers = handlers 00592 try: 00593 af, socktype, proto, canonname, sockaddr = addrinfo 00594 self.create_socket(af, socktype) 00595 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 00596 try: 00597 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 00598 except AttributeError: 00599 pass 00600 if have_ipv6 and af == socket.AF_INET6: 00601 self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) 00602 self.bind(sockaddr) 00603 self.listen(5) 00604 except Exception, e: 00605 self.log("Couldn't set up HTTP listener: %s" % e, rpki.log.warn) 00606 rpki.log.traceback() 00607 self.close() 00608 for h in handlers: 00609 self.log("Handling %s" % h[0]) 00610 00611 def handle_accept(self): 00612 """ 00613 Asyncore says we have an incoming connection, spawn an http_server 00614 stream for it and pass along all of our handler data. 00615 """ 00616 try: 00617 s, client = self.accept() 00618 self.log("Accepting connection from %s" % addr_to_string(client)) 00619 http_server(sock = s, handlers = self.handlers) 00620 except (rpki.async.ExitNow, SystemExit): 00621 raise 00622 except Exception, e: 00623 self.log("Unable to accept connection: %s" % e) 00624 self.handle_error() 00625 00626 def handle_error(self): 00627 """ 00628 Asyncore signaled an error, pass it along or log it. 00629 """ 00630 if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): 00631 raise 00632 self.log("Error in HTTP listener", rpki.log.warn) 00633 rpki.log.traceback() 00634 00635 class http_client(http_stream): 00636 """ 00637 HTTP client stream. 00638 """ 00639 00640 ## @var parse_type 00641 # Stream parser should look for incoming HTTP response messages. 00642 parse_type = http_response 00643 00644 ## @var timeout 00645 # Use the default client timeout value set in the module header. 00646 timeout = default_client_timeout 00647 00648 ## @var state 00649 # Application layer connection state. 00650 state = None 00651 00652 def __init__(self, queue, hostport): 00653 self.log("Creating new connection to %s" % addr_to_string(hostport)) 00654 http_stream.__init__(self) 00655 self.queue = queue 00656 self.host = hostport[0] 00657 self.port = hostport[1] 00658 self.set_state("opening") 00659 self.expect_close = not want_persistent_client 00660 00661 def start(self): 00662 """ 00663 Create socket and request a connection. 00664 """ 00665 if not use_adns: 00666 self.log("Not using ADNS") 00667 self.gotaddrinfo([(socket.AF_INET, self.host)]) 00668 elif self.host == "localhost": 00669 self.log("Bypassing DNS for localhost") 00670 self.gotaddrinfo(localhost_addrinfo()) 00671 else: 00672 import rpki.adns # This should move to start of file once we've decided to inflict it on all users 00673 families = supported_address_families(enable_ipv6_clients) 00674 self.log("Starting ADNS lookup for %s in families %r" % (self.host, families)) 00675 rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families) 00676 00677 def dns_error(self, e): 00678 """ 00679 Handle DNS lookup errors. For now, just whack the connection. 00680 Undoubtedly we should do something better with diagnostics here. 00681 """ 00682 self.handle_error() 00683 00684 def gotaddrinfo(self, addrinfo): 00685 """ 00686 Got address data from DNS, create socket and request connection. 00687 """ 00688 try: 00689 self.af, self.address = random.choice(addrinfo) 00690 self.log("Connecting to AF %s host %s port %s addr %s" % (self.af, self.host, self.port, self.address)) 00691 self.create_socket(self.af, socket.SOCK_STREAM) 00692 self.connect((self.address, self.port)) 00693 if self.addr is None: 00694 self.addr = (self.host, self.port) 00695 self.update_timeout() 00696 except (rpki.async.ExitNow, SystemExit): 00697 raise 00698 except Exception: 00699 self.handle_error() 00700 00701 def handle_connect(self): 00702 """ 00703 Asyncore says socket has connected. 00704 """ 00705 self.log("Socket connected") 00706 self.set_state("idle") 00707 assert self.queue.client is self 00708 self.queue.send_request() 00709 00710 def set_state(self, state): 00711 """ 00712 Set HTTP client connection state. 00713 """ 00714 self.log("State transition %s => %s" % (self.state, state)) 00715 self.state = state 00716 00717 def handle_no_content_length(self): 00718 """ 00719 Handle response message that used neither chunking nor a 00720 Content-Length header (that is: this message will be the last one 00721 in this server stream). In this case we want to read until we 00722 reach the end of the data stream. 00723 """ 00724 self.set_terminator(None) 00725 00726 def send_request(self, msg): 00727 """ 00728 Queue up request message and kickstart connection. 00729 """ 00730 self.log("Sending request %r" % msg) 00731 assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) 00732 self.set_state("request-sent") 00733 msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" 00734 self.push(msg.format()) 00735 self.restart() 00736 00737 def handle_message(self): 00738 """ 00739 Handle incoming HTTP response message. Make sure we're in a state 00740 where we expect to see such a message (and allow the mysterious 00741 empty messages that Apache sends during connection close, no idea 00742 what that is supposed to be about). If everybody agrees that the 00743 connection should stay open, put it into an idle state; otherwise, 00744 arrange for the stream to shut down. 00745 """ 00746 00747 self.log("Message received, state %s" % self.state) 00748 00749 if not self.msg.persistent: 00750 self.expect_close = True 00751 00752 if self.state != "request-sent": 00753 if self.state == "closing": 00754 assert not self.msg.body 00755 self.log("Ignoring empty response received while closing") 00756 return 00757 raise rpki.exceptions.HTTPUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state) 00758 00759 if self.expect_close: 00760 self.log("Closing") 00761 self.set_state("closing") 00762 self.close_when_done() 00763 else: 00764 self.log("Idling") 00765 self.set_state("idle") 00766 self.update_timeout() 00767 00768 if self.msg.code != 200: 00769 raise rpki.exceptions.HTTPRequestFailed, "HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body) 00770 self.queue.return_result(self, self.msg, detach = self.expect_close) 00771 00772 def handle_close(self): 00773 """ 00774 Asyncore signaled connection close. If we were waiting for that 00775 to find the end of a response message, process the resulting 00776 message now; if we were waiting for the response to a request we 00777 sent, signal the error. 00778 """ 00779 http_stream.handle_close(self) 00780 self.log("State %s" % self.state) 00781 if self.get_terminator() is None: 00782 self.handle_body() 00783 elif self.state == "request-sent": 00784 raise rpki.exceptions.HTTPClientAborted, "HTTP request aborted by close event" 00785 else: 00786 self.queue.detach(self) 00787 00788 def handle_timeout(self): 00789 """ 00790 Connection idle timer has expired. Shut down connection in any 00791 case, noisily if we weren't idle. 00792 """ 00793 bad = self.state not in ("idle", "closing") 00794 if bad: 00795 self.log("Timeout while in state %s" % self.state, rpki.log.warn) 00796 http_stream.handle_timeout(self) 00797 if bad: 00798 try: 00799 raise rpki.exceptions.HTTPTimeout 00800 except: 00801 self.handle_error() 00802 else: 00803 self.queue.detach(self) 00804 00805 def handle_error(self): 00806 """ 00807 Asyncore says something threw an exception. Log it, then shut 00808 down the connection and pass back the exception. 00809 """ 00810 eclass, edata = sys.exc_info()[0:2] 00811 self.log("Error on HTTP client connection %s:%s %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn) 00812 http_stream.handle_error(self) 00813 self.queue.return_result(self, edata, detach = True) 00814 00815 class http_queue(object): 00816 """ 00817 Queue of pending HTTP requests for a single destination. This class 00818 is very tightly coupled to http_client; http_client handles the HTTP 00819 stream itself, this class provides a slightly higher-level API. 00820 """ 00821 00822 log = log_method 00823 00824 def __repr__(self): 00825 return rpki.log.log_repr(self, "%s" % addr_to_string(self.hostport)) 00826 00827 def __init__(self, hostport): 00828 self.hostport = hostport 00829 self.client = None 00830 self.log("Created") 00831 self.queue = [] 00832 00833 def request(self, *requests): 00834 """ 00835 Append http_request object(s) to this queue. 00836 """ 00837 self.log("Adding requests %r" % requests) 00838 self.queue.extend(requests) 00839 00840 def restart(self): 00841 """ 00842 Send next request for this queue, if we can. This may involve 00843 starting a new http_client stream, reusing an existing idle 00844 stream, or just ignoring this request if there's an active client 00845 stream already; in the last case, handling of the response (or 00846 exception, or timeout) for the query currently in progress will 00847 call this method when it's time to kick out the next query. 00848 """ 00849 try: 00850 if self.client is None: 00851 self.client = http_client(self, self.hostport) 00852 self.log("Attached client %r" % self.client) 00853 self.client.start() 00854 elif self.client.state == "idle": 00855 self.log("Sending request to existing client %r" % self.client) 00856 self.send_request() 00857 else: 00858 self.log("Client %r exists in state %r" % (self.client, self.client.state)) 00859 except (rpki.async.ExitNow, SystemExit): 00860 raise 00861 except Exception, e: 00862 self.return_result(self.client, e, detach = True) 00863 00864 def send_request(self): 00865 """ 00866 Kick out the next query in this queue, if any. 00867 """ 00868 if self.queue: 00869 self.client.send_request(self.queue[0]) 00870 00871 def detach(self, client_): 00872 """ 00873 Detatch a client from this queue. Silently ignores attempting to 00874 detach a client that is not attached to this queue, to simplify 00875 handling of what otherwise would be a nasty set of race 00876 conditions. 00877 """ 00878 if client_ is self.client: 00879 self.log("Detaching client %r" % client_) 00880 self.client = None 00881 00882 def return_result(self, client, result, detach = False): 00883 """ 00884 Client stream has returned a result, which we need to pass along 00885 to the original caller. Result may be either an HTTP response 00886 message or an exception. In either case, once we're done 00887 processing this result, kick off next message in the queue, if any. 00888 """ 00889 00890 if client is not self.client: 00891 self.log("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn) 00892 return 00893 00894 if detach: 00895 self.detach(client) 00896 00897 try: 00898 req = self.queue.pop(0) 00899 self.log("Dequeuing request %r" % req) 00900 except IndexError: 00901 self.log("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn) 00902 return 00903 00904 assert isinstance(result, http_response) or isinstance(result, Exception) 00905 00906 if isinstance(result, http_response): 00907 try: 00908 self.log("Returning result %r to caller" % result) 00909 req.callback(result.body) 00910 except (rpki.async.ExitNow, SystemExit): 00911 raise 00912 except Exception, e: 00913 result = e 00914 00915 if isinstance(result, Exception): 00916 try: 00917 self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn) 00918 req.errback(result) 00919 except (rpki.async.ExitNow, SystemExit): 00920 raise 00921 except Exception: 00922 # 00923 # If we get here, we may have lost the event chain. Not 00924 # obvious what we can do about it at this point, but force a 00925 # traceback so that it will be somewhat obvious that something 00926 # really bad happened. 00927 # 00928 self.log("Exception in exception callback", rpki.log.warn) 00929 rpki.log.traceback(True) 00930 00931 self.log("Queue: %r" % self.queue) 00932 00933 if self.queue: 00934 self.restart() 00935 00936 ## @var client_queues 00937 # Map of (host, port) tuples to http_queue objects. 00938 client_queues = {} 00939 00940 def client(msg, url, callback, errback): 00941 """ 00942 Open client HTTP connection, send a message, set up callbacks to 00943 handle response. 00944 """ 00945 00946 u = urlparse.urlparse(url) 00947 00948 if (u.scheme not in ("", "http") or 00949 u.username is not None or 00950 u.password is not None or 00951 u.params != "" or 00952 u.query != "" or 00953 u.fragment != ""): 00954 raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url 00955 00956 if debug_http: 00957 rpki.log.debug("Contacting %s" % url) 00958 00959 request = http_request( 00960 cmd = "POST", 00961 path = u.path, 00962 body = msg, 00963 callback = callback, 00964 errback = errback, 00965 Host = u.hostname, 00966 Content_Type = rpki_content_type) 00967 00968 hostport = (u.hostname or "localhost", u.port or default_tcp_port) 00969 00970 if debug_http: 00971 rpki.log.debug("Created request %r for %s" % (request, addr_to_string(hostport))) 00972 if hostport not in client_queues: 00973 client_queues[hostport] = http_queue(hostport) 00974 client_queues[hostport].request(request) 00975 00976 # Defer connection attempt until after we've had time to process any 00977 # pending I/O events, in case connections have closed. 00978 00979 if debug_http: 00980 rpki.log.debug("Scheduling connection startup for %r" % request) 00981 rpki.async.defer(client_queues[hostport].restart) 00982 00983 def server(handlers, port, host = ""): 00984 """ 00985 Run an HTTP server and wait (forever) for connections. 00986 """ 00987 00988 if not isinstance(handlers, (tuple, list)): 00989 handlers = (("/", handlers),) 00990 00991 # Yes, this is sick. So is getaddrinfo() returning duplicate 00992 # records, which RedHat has the gall to claim is a feature. 00993 ai = [] 00994 for af in supported_address_families(enable_ipv6_servers): 00995 try: 00996 if host: 00997 h = host 00998 elif have_ipv6 and af == socket.AF_INET6: 00999 h = "::" 01000 else: 01001 h = "0.0.0.0" 01002 for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM): 01003 if a not in ai: 01004 ai.append(a) 01005 except socket.gaierror: 01006 pass 01007 01008 for a in ai: 01009 http_listener(addrinfo = a, handlers = handlers) 01010 01011 rpki.async.event_loop() 01012 01013 class caller(object): 01014 """ 01015 Handle client-side mechanics for protocols based on HTTP, CMS, and 01016 rpki.xml_utils. Calling sequence is intended to nest within 01017 rpki.async.sync_wrapper. 01018 """ 01019 01020 debug = False 01021 01022 def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None): 01023 self.proto = proto 01024 self.client_key = client_key 01025 self.client_cert = client_cert 01026 self.server_ta = server_ta 01027 self.server_cert = server_cert 01028 self.url = url 01029 if debug is not None: 01030 self.debug = debug 01031 01032 def __call__(self, cb, eb, *pdus): 01033 01034 def done(r_der): 01035 """ 01036 Handle CMS-wrapped XML response message. 01037 """ 01038 r_cms = self.proto.cms_msg(DER = r_der) 01039 r_msg = r_cms.unwrap((self.server_ta, self.server_cert)) 01040 if self.debug: 01041 print "<!-- Reply -->" 01042 print r_cms.pretty_print_content() 01043 cb(r_msg) 01044 01045 q_msg = self.proto.msg.query(*pdus) 01046 q_cms = self.proto.cms_msg() 01047 q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert) 01048 if self.debug: 01049 print "<!-- Query -->" 01050 print q_cms.pretty_print_content() 01051 01052 client(url = self.url, msg = q_der, callback = done, errback = eb)