00001 """
00002 HTTP utilities, both client and server.
00003
00004 $Id: http.py 3456 2010-10-04 20:24:27Z sra $
00005
00006 Copyright (C) 2009-2010 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019
00020 Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
00021
00022 Permission to use, copy, modify, and distribute this software for any
00023 purpose with or without fee is hereby granted, provided that the above
00024 copyright notice and this permission notice appear in all copies.
00025
00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00028 AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00032 PERFORMANCE OF THIS SOFTWARE.
00033 """
00034
00035 import time, socket, asyncore, asynchat, urlparse, sys, random
00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log
00037 import POW
00038
00039
00040
00041 rpki_content_type = "application/x-rpki"
00042
00043
00044
00045 debug_http = False
00046
00047
00048
00049 want_persistent_client = False
00050
00051
00052
00053 want_persistent_server = False
00054
00055
00056
00057 default_client_timeout = rpki.sundial.timedelta(minutes = 15)
00058
00059
00060
00061
00062
00063
00064 default_server_timeout = rpki.sundial.timedelta(minutes = 20)
00065
00066
00067
00068 default_http_version = (1, 0)
00069
00070
00071
00072 default_tcp_port = 443
00073
00074
00075
00076
00077 enable_ipv6_servers = True
00078
00079
00080
00081
00082
00083 enable_ipv6_clients = False
00084
00085
00086
00087
00088 use_adns = False
00089
00090
00091
00092
00093
00094
00095
00096
00097 try:
00098 socket.socket(socket.AF_INET6).close()
00099 socket.IPPROTO_IPV6
00100 socket.IPV6_V6ONLY
00101 except:
00102 have_ipv6 = False
00103 else:
00104 have_ipv6 = True
00105
00106 def supported_address_families(enable_ipv6):
00107 """
00108 IP address families on which servers should listen, and to consider
00109 when selecting addresses for client connections.
00110 """
00111 if enable_ipv6 and have_ipv6:
00112 return (socket.AF_INET, socket.AF_INET6)
00113 else:
00114 return (socket.AF_INET,)
00115
00116 def localhost_addrinfo():
00117 """
00118 Return pseudo-getaddrinfo results for localhost.
00119 """
00120 result = [(socket.AF_INET, "127.0.0.1")]
00121 if enable_ipv6_clients and have_ipv6:
00122 result.append((socket.AF_INET6, "::1"))
00123 return result
00124
00125 class http_message(object):
00126 """
00127 Virtual class representing of one HTTP message.
00128 """
00129
00130 software_name = "ISC RPKI library"
00131
00132 def __init__(self, version = None, body = None, headers = None):
00133 self.version = version
00134 self.body = body
00135 self.headers = headers
00136 self.normalize_headers()
00137
00138 def normalize_headers(self, headers = None):
00139 """
00140 Clean up (some of) the horrible messes that HTTP allows in its
00141 headers.
00142 """
00143 if headers is None:
00144 headers = () if self.headers is None else self.headers.items()
00145 translate_underscore = True
00146 else:
00147 translate_underscore = False
00148 result = {}
00149 for k, v in headers:
00150 if translate_underscore:
00151 k = k.replace("_", "-")
00152 k = "-".join(s.capitalize() for s in k.split("-"))
00153 v = v.strip()
00154 if k in result:
00155 result[k] += ", " + v
00156 else:
00157 result[k] = v
00158 self.headers = result
00159
00160 @classmethod
00161 def parse_from_wire(cls, headers):
00162 """
00163 Parse and normalize an incoming HTTP message.
00164 """
00165 self = cls()
00166 headers = headers.split("\r\n")
00167 self.parse_first_line(*headers.pop(0).split(None, 2))
00168 for i in xrange(len(headers) - 2, -1, -1):
00169 if headers[i + 1][0].isspace():
00170 headers[i] += headers[i + 1]
00171 del headers[i + 1]
00172 self.normalize_headers([h.split(":", 1) for h in headers])
00173 return self
00174
00175 def format(self):
00176 """
00177 Format an outgoing HTTP message.
00178 """
00179 s = self.format_first_line()
00180 if self.body is not None:
00181 assert isinstance(self.body, str)
00182 self.headers["Content-Length"] = len(self.body)
00183 for kv in self.headers.iteritems():
00184 s += "%s: %s\r\n" % kv
00185 s += "\r\n"
00186 if self.body is not None:
00187 s += self.body
00188 return s
00189
00190 def __str__(self):
00191 return self.format()
00192
00193 def parse_version(self, version):
00194 """
00195 Parse HTTP version, raise an exception if we can't.
00196 """
00197 if version[:5] != "HTTP/":
00198 raise rpki.exceptions.HTTPBadVersion, "Couldn't parse version %s" % version
00199 self.version = tuple(int(i) for i in version[5:].split("."))
00200
00201 def persistent(self):
00202 """
00203 Figure out whether this HTTP message encourages a persistent connection.
00204 """
00205 c = self.headers.get("Connection")
00206 if self.version == (1, 1):
00207 return c is None or "close" not in c.lower()
00208 elif self.version == (1, 0):
00209 return c is not None and "keep-alive" in c.lower()
00210 else:
00211 return False
00212
00213 class http_request(http_message):
00214 """
00215 HTTP request message.
00216 """
00217
00218 def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
00219 assert cmd == "POST" or body is None
00220 http_message.__init__(self, version = version, body = body, headers = headers)
00221 self.cmd = cmd
00222 self.path = path
00223 self.callback = callback
00224 self.errback = errback
00225 self.retried = False
00226
00227 def parse_first_line(self, cmd, path, version):
00228 """
00229 Parse first line of HTTP request message.
00230 """
00231 self.parse_version(version)
00232 self.cmd = cmd
00233 self.path = path
00234
00235 def format_first_line(self):
00236 """
00237 Format first line of HTTP request message, and set up the
00238 User-Agent header.
00239 """
00240 self.headers.setdefault("User-Agent", self.software_name)
00241 return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
00242
00243 class http_response(http_message):
00244 """
00245 HTTP response message.
00246 """
00247
00248 def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
00249 http_message.__init__(self, version = version, body = body, headers = headers)
00250 self.code = code
00251 self.reason = reason
00252
00253 def parse_first_line(self, version, code, reason):
00254 """
00255 Parse first line of HTTP response message.
00256 """
00257 self.parse_version(version)
00258 self.code = int(code)
00259 self.reason = reason
00260
00261 def format_first_line(self):
00262 """
00263 Format first line of HTTP response message, and set up Date and
00264 Server headers.
00265 """
00266 self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
00267 self.headers.setdefault("Server", self.software_name)
00268 return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
00269
00270 def log_method(self, msg, logger = rpki.log.debug):
00271 """
00272 Logging method used in several different classes.
00273 """
00274 assert isinstance(logger, rpki.log.logger)
00275 if debug_http or logger is not rpki.log.debug:
00276 logger("%r: %s" % (self, msg))
00277
00278 class http_stream(asynchat.async_chat):
00279 """
00280 Virtual class representing an HTTP message stream.
00281 """
00282
00283 log = log_method
00284
00285 def __init__(self, sock = None):
00286 asynchat.async_chat.__init__(self, sock)
00287 self.buffer = []
00288 self.timer = rpki.async.timer(self.handle_timeout)
00289 self.restart()
00290
00291 def restart(self):
00292 """
00293 (Re)start HTTP message parser, reset timer.
00294 """
00295 assert not self.buffer
00296 self.chunk_handler = None
00297 self.set_terminator("\r\n\r\n")
00298 self.update_timeout()
00299
00300 def update_timeout(self):
00301 """
00302 Put this stream's timer in known good state: set it to the
00303 stream's timeout value if we're doing timeouts, otherwise clear
00304 it.
00305 """
00306 if self.timeout is not None:
00307 self.log("Setting timeout %r" % self.timeout)
00308 self.timer.set(self.timeout)
00309 else:
00310 self.log("Clearing timeout")
00311 self.timer.cancel()
00312
00313 def collect_incoming_data(self, data):
00314 """
00315 Buffer incoming data from asynchat.
00316 """
00317 self.buffer.append(data)
00318 self.update_timeout()
00319
00320 def get_buffer(self):
00321 """
00322 Consume data buffered from asynchat.
00323 """
00324 val = "".join(self.buffer)
00325 self.buffer = []
00326 return val
00327
00328 def found_terminator(self):
00329 """
00330 Asynchat reported that it found whatever terminator we set, so
00331 figure out what to do next. This can be messy, because we can be
00332 in any of several different states:
00333
00334 @li We might be handling chunked HTTP, in which case we have to
00335 initialize the chunk decoder;
00336
00337 @li We might have found the end of the message body, in which case
00338 we can (finally) process it; or
00339
00340 @li We might have just gotten to the end of the message headers,
00341 in which case we have to parse them to figure out which of three
00342 separate mechanisms (chunked, content-length, TCP close) is going
00343 to tell us how to find the end of the message body.
00344 """
00345 self.update_timeout()
00346 if self.chunk_handler:
00347 self.chunk_handler()
00348 elif not isinstance(self.get_terminator(), str):
00349 self.handle_body()
00350 else:
00351 self.msg = self.parse_type.parse_from_wire(self.get_buffer())
00352 if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
00353 self.msg.body = []
00354 self.chunk_handler = self.chunk_header
00355 self.set_terminator("\r\n")
00356 elif "Content-Length" in self.msg.headers:
00357 self.set_terminator(int(self.msg.headers["Content-Length"]))
00358 else:
00359 self.handle_no_content_length()
00360
00361 def chunk_header(self):
00362 """
00363 Asynchat just handed us what should be the header of one chunk of
00364 a chunked encoding stream. If this chunk has a body, set the
00365 stream up to read it; otherwise, this is the last chunk, so start
00366 the process of exiting the chunk decoder.
00367 """
00368 n = int(self.get_buffer().partition(";")[0], 16)
00369 self.log("Chunk length %s" % n)
00370 if n:
00371 self.chunk_handler = self.chunk_body
00372 self.set_terminator(n)
00373 else:
00374 self.msg.body = "".join(self.msg.body)
00375 self.chunk_handler = self.chunk_discard_trailer
00376
00377 def chunk_body(self):
00378 """
00379 Asynchat just handed us what should be the body of a chunk of the
00380 body of a chunked message (sic). Save it, and prepare to move on
00381 to the next chunk.
00382 """
00383 self.log("Chunk body")
00384 self.msg.body += self.buffer
00385 self.buffer = []
00386 self.chunk_handler = self.chunk_discard_crlf
00387 self.set_terminator("\r\n")
00388
00389 def chunk_discard_crlf(self):
00390 """
00391 Consume the CRLF that terminates a chunk, reinitialize chunk
00392 decoder to be ready for the next chunk.
00393 """
00394 self.log("Chunk CRLF")
00395 s = self.get_buffer()
00396 assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
00397 self.chunk_handler = self.chunk_header
00398
00399 def chunk_discard_trailer(self):
00400 """
00401 Consume chunk trailer, which should be empty, then (finally!) exit
00402 the chunk decoder and hand complete message off to the application.
00403 """
00404 self.log("Chunk trailer")
00405 s = self.get_buffer()
00406 assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
00407 self.chunk_handler = None
00408 self.handle_message()
00409
00410 def handle_body(self):
00411 """
00412 Hand normal (not chunked) message off to the application.
00413 """
00414 self.msg.body = self.get_buffer()
00415 self.handle_message()
00416
00417 def handle_error(self):
00418 """
00419 Asynchat (or asyncore, or somebody) raised an exception. See
00420 whether it's one we should just pass along, otherwise log a stack
00421 trace and close the stream.
00422 """
00423 etype = sys.exc_info()[0]
00424 if etype in (SystemExit, rpki.async.ExitNow):
00425 self.log("Caught %s, propagating" % etype.__name__)
00426 raise
00427 self.log("Error in HTTP stream handler", rpki.log.warn)
00428 rpki.log.traceback()
00429 if etype not in (rpki.exceptions.HTTPClientAborted,):
00430 self.log("Closing due to error", rpki.log.warn)
00431 self.close()
00432
00433 def handle_timeout(self):
00434 """
00435 Inactivity timer expired, close connection with prejudice.
00436 """
00437 self.log("Timeout, closing")
00438 self.close()
00439
00440 def handle_close(self):
00441 """
00442 Wrapper around asynchat connection close handler, so that we can
00443 log the event.
00444 """
00445 self.log("Close event in HTTP stream handler")
00446 asynchat.async_chat.handle_close(self)
00447
00448 class http_server(http_stream):
00449 """
00450 HTTP(S) server stream.
00451 """
00452
00453
00454
00455 parse_type = http_request
00456
00457
00458
00459 timeout = default_server_timeout
00460
00461 def __init__(self, sock, handlers):
00462 self.log("Starting")
00463 self.handlers = handlers
00464 http_stream.__init__(self, sock = sock)
00465 self.expect_close = not want_persistent_server
00466
00467 def handle_no_content_length(self):
00468 """
00469 Handle an incoming message that used neither chunking nor a
00470 Content-Length header (that is: this message will be the last one
00471 in this server stream). No special action required.
00472 """
00473 self.handle_message()
00474
00475 def find_handler(self, path):
00476 """
00477 Helper method to search self.handlers.
00478 """
00479 for s, h in self.handlers:
00480 if path.startswith(s):
00481 return h
00482 return None
00483
00484 def handle_message(self):
00485 """
00486 HTTP layer managed to deliver a complete HTTP request to
00487 us, figure out what to do with it. Check the command and
00488 Content-Type, look for a handler, and if everything looks right,
00489 pass the message body, path, and a reply callback to the handler.
00490 """
00491 self.log("Received request %s %s" % (self.msg.cmd, self.msg.path))
00492 if not self.msg.persistent():
00493 self.expect_close = True
00494 handler = self.find_handler(self.msg.path)
00495 error = None
00496 if self.msg.cmd != "POST":
00497 error = 501, "No handler for method %s" % self.msg.cmd
00498 elif self.msg.headers["Content-Type"] != rpki_content_type:
00499 error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
00500 elif handler is None:
00501 error = 404, "No handler for URL %s" % self.msg.path
00502 if error is None:
00503 try:
00504 handler(self.msg.body, self.msg.path, self.send_reply)
00505 except (rpki.async.ExitNow, SystemExit):
00506 raise
00507 except Exception, e:
00508 rpki.log.traceback()
00509 self.send_error(500, "Unhandled exception %s" % e)
00510 else:
00511 self.send_error(code = error[0], reason = error[1])
00512
00513 def send_error(self, code, reason):
00514 """
00515 Send an error response to this request.
00516 """
00517 self.send_message(code = code, reason = reason)
00518
00519 def send_reply(self, code, body):
00520 """
00521 Send a reply to this request.
00522 """
00523 self.send_message(code = code, body = body)
00524
00525 def send_message(self, code, reason = "OK", body = None):
00526 """
00527 Queue up reply message. If both parties agree that connection is
00528 persistant, and if no error occurred, restart this stream to
00529 listen for next message; otherwise, queue up a close event for
00530 this stream so it will shut down once the reply has been sent.
00531 """
00532 self.log("Sending response %s %s" % (code, reason))
00533 if code >= 400:
00534 self.expect_close = True
00535 msg = http_response(code = code, reason = reason, body = body,
00536 Content_Type = rpki_content_type,
00537 Connection = "Close" if self.expect_close else "Keep-Alive")
00538 self.push(msg.format())
00539 if self.expect_close:
00540 self.log("Closing")
00541 self.timer.cancel()
00542 self.close_when_done()
00543 else:
00544 self.log("Listening for next message")
00545 self.restart()
00546
00547 class http_listener(asyncore.dispatcher):
00548 """
00549 Listener for incoming HTTP(S) connections.
00550 """
00551
00552 log = log_method
00553
00554 def __init__(self, handlers, addrinfo):
00555 self.log("Listener")
00556 asyncore.dispatcher.__init__(self)
00557 self.handlers = handlers
00558 try:
00559 af, socktype, proto, canonname, sockaddr = addrinfo
00560 self.create_socket(af, socktype)
00561 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00562 try:
00563 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00564 except AttributeError:
00565 pass
00566 if have_ipv6 and af == socket.AF_INET6:
00567 self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
00568 self.bind(sockaddr)
00569 self.listen(5)
00570 except:
00571 self.log("Couldn't set up HTTP listener", rpki.log.warn)
00572 rpki.log.traceback()
00573 self.close()
00574 self.log("Listening on %r, handlers %r" % (sockaddr, handlers))
00575
00576 def handle_accept(self):
00577 """
00578 Asyncore says we have an incoming connection, spawn an http_server
00579 stream for it and pass along all of our handler data.
00580 """
00581 self.log("Accepting connection")
00582 try:
00583 s, client = self.accept()
00584 self.log("Accepting connection from %r" % (client,))
00585 http_server(sock = s, handlers = self.handlers)
00586 except (rpki.async.ExitNow, SystemExit):
00587 raise
00588 except:
00589 self.handle_error()
00590
00591 def handle_error(self):
00592 """
00593 Asyncore signaled an error, pass it along or log it.
00594 """
00595 if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
00596 raise
00597 self.log("Error in HTTP listener", rpki.log.warn)
00598 rpki.log.traceback()
00599
00600 class http_client(http_stream):
00601 """
00602 HTTP(S) client stream.
00603 """
00604
00605
00606
00607 parse_type = http_response
00608
00609
00610
00611 timeout = default_client_timeout
00612
00613 def __init__(self, queue, hostport):
00614 self.log("Creating new connection to %r" % (hostport,))
00615 http_stream.__init__(self)
00616 self.queue = queue
00617 self.host = hostport[0]
00618 self.port = hostport[1]
00619 self.state = "opening"
00620 self.expect_close = not want_persistent_client
00621
00622 def start(self):
00623 """
00624 Create socket and request a connection.
00625 """
00626 if not use_adns:
00627 self.gotaddrinfo([(socket.AF_INET, self.host)])
00628 elif self.host == "localhost":
00629 self.gotaddrinfo(localhost_addrinfo())
00630 else:
00631 import rpki.adns
00632 rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, supported_address_families(enable_ipv6_clients))
00633
00634 def dns_error(self, e):
00635 """
00636 Handle DNS lookup errors. For now, just whack the connection.
00637 Undoubtedly we should do something better with diagnostics here.
00638 """
00639 self.handle_error()
00640
00641 def gotaddrinfo(self, addrinfo):
00642 """
00643 Got address data from DNS, create socket and request connection.
00644 """
00645 try:
00646 self.af, self.addr = random.choice(addrinfo)
00647 self.create_socket(self.af, socket.SOCK_STREAM)
00648 self.connect((self.addr, self.port))
00649 except (rpki.async.ExitNow, SystemExit):
00650 raise
00651 except:
00652 self.handle_error()
00653
00654 def handle_connect(self):
00655 """
00656 Asyncore says socket has connected.
00657 """
00658 self.log("Socket connected")
00659 self.set_state("idle")
00660 self.queue.send_request()
00661
00662 def set_state(self, state):
00663 """
00664 Set HTTP client connection state.
00665 """
00666 self.log("State transition %s => %s" % (self.state, state))
00667 self.state = state
00668
00669 def handle_no_content_length(self):
00670 """
00671 Handle response message that used neither chunking nor a
00672 Content-Length header (that is: this message will be the last one
00673 in this server stream). In this case we want to read until we
00674 reach the end of the data stream.
00675 """
00676 self.set_terminator(None)
00677
00678 def send_request(self, msg):
00679 """
00680 Queue up request message and kickstart connection.
00681 """
00682 self.log("Sending request %r" % msg)
00683 assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
00684 self.set_state("request-sent")
00685 msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
00686 self.push(msg.format())
00687 self.restart()
00688
00689 def handle_message(self):
00690 """
00691 Handle incoming HTTP response message. Make sure we're in a state
00692 where we expect to see such a message (and allow the mysterious
00693 empty messages that Apache sends during connection close, no idea
00694 what that is supposed to be about). If everybody agrees that the
00695 connection should stay open, put it into an idle state; otherwise,
00696 arrange for the stream to shut down.
00697 """
00698
00699 self.log("Message received, state %s" % self.state)
00700
00701 if not self.msg.persistent():
00702 self.expect_close = True
00703
00704 if self.state != "request-sent":
00705 if self.state == "closing":
00706 assert not self.msg.body
00707 self.log("Ignoring empty response received while closing")
00708 return
00709 raise rpki.exceptions.HTTPUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state)
00710
00711 if self.expect_close:
00712 self.log("Closing")
00713 self.set_state("closing")
00714 self.queue.detach(self)
00715 self.close_when_done()
00716 else:
00717 self.log("Idling")
00718 self.set_state("idle")
00719 self.update_timeout()
00720
00721 if self.msg.code != 200:
00722 raise rpki.exceptions.HTTPRequestFailed, "HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)
00723 self.queue.return_result(self.msg)
00724
00725 def handle_close(self):
00726 """
00727 Asyncore signaled connection close. If we were waiting for that
00728 to find the end of a response message, process the resulting
00729 message now; if we were waiting for the response to a request we
00730 sent, signal the error.
00731 """
00732 http_stream.handle_close(self)
00733 self.log("State %s" % self.state)
00734 self.queue.detach(self)
00735 if self.get_terminator() is None:
00736 self.handle_body()
00737 elif self.state == "request-sent":
00738 raise rpki.exceptions.HTTPClientAborted, "HTTP request aborted by close event"
00739
00740 def handle_timeout(self):
00741 """
00742 Connection idle timer has expired. Shut down connection in any
00743 case, noisily if we weren't idle.
00744 """
00745 if self.state != "idle":
00746 self.log("Timeout while in state %s" % self.state, rpki.log.warn)
00747 http_stream.handle_timeout(self)
00748 self.queue.detach(self)
00749 if self.state not in ("idle", "closing"):
00750 try:
00751 raise rpki.exceptions.HTTPTimeout
00752 except rpki.exceptions.HTTPTimeout, e:
00753 self.queue.return_result(e)
00754
00755 def handle_error(self):
00756 """
00757 Asyncore says something threw an exception. Log it, then shut
00758 down the connection and pass back the exception.
00759 """
00760 eclass, edata = sys.exc_info()[0:2]
00761 self.log("Error on HTTP client connection %s:%s: %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn)
00762 http_stream.handle_error(self)
00763 self.queue.detach(self)
00764 self.queue.return_result(edata)
00765
00766 class http_queue(object):
00767 """
00768 Queue of pending HTTP requests for a single destination. This class
00769 is very tightly coupled to http_client; http_client handles the HTTP
00770 stream itself, this class provides a slightly higher-level API.
00771 """
00772
00773 log = log_method
00774
00775 def __init__(self, hostport):
00776 self.log("Creating queue for %r" % (hostport,))
00777 self.hostport = hostport
00778 self.client = None
00779 self.queue = []
00780
00781 def request(self, *requests):
00782 """
00783 Append http_request object(s) to this queue.
00784 """
00785 self.log("Adding requests %r" % requests)
00786 self.queue.extend(requests)
00787
00788 def restart(self):
00789 """
00790 Send next request for this queue, if we can. This may involve
00791 starting a new http_client stream, reusing an existing idle
00792 stream, or just ignoring this request if there's an active client
00793 stream already; in the last case, handling of the response (or
00794 exception, or timeout) for the query currently in progress will
00795 call this method when it's time to kick out the next query.
00796 """
00797 try:
00798 if self.client is None:
00799 self.client = http_client(self, self.hostport)
00800 self.log("Attached client %r" % self.client)
00801 self.client.start()
00802 elif self.client.state == "idle":
00803 self.log("Sending request to existing client %r" % self.client)
00804 self.send_request()
00805 else:
00806 self.log("Client %r exists in state %r" % (self.client, self.client.state))
00807 except (rpki.async.ExitNow, SystemExit):
00808 raise
00809 except Exception, e:
00810 self.return_result(e)
00811
00812 def send_request(self):
00813 """
00814 Kick out the next query in this queue, if any.
00815 """
00816 if self.queue:
00817 self.client.send_request(self.queue[0])
00818
00819 def detach(self, client_):
00820 """
00821 Detatch a client from this queue. Silently ignores attempting to
00822 detach a client that is not attached to this queue, to simplify
00823 handling of what otherwise would be a nasty set of race
00824 conditions.
00825 """
00826 if client_ is self.client:
00827 self.log("Detaching client %r" % client_)
00828 self.client = None
00829
00830 def return_result(self, result):
00831 """
00832 Client stream has returned a result, which we need to pass along
00833 to the original caller. Result may be either an HTTP response
00834 message or an exception. In either case, once we're done
00835 processing this result, kick off next message in the queue, if any.
00836 """
00837
00838 try:
00839 req = self.queue.pop(0)
00840 self.log("Dequeuing request %r" % req)
00841 except IndexError:
00842 self.log("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn)
00843 return
00844
00845 try:
00846 if isinstance(result, http_response):
00847 self.log("Returning result %r to caller" % result)
00848 req.callback(result.body)
00849 else:
00850 assert isinstance(result, Exception)
00851 self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn)
00852 req.errback(result)
00853 except (rpki.async.ExitNow, SystemExit):
00854 raise
00855 except:
00856 self.log("Unhandled exception from callback")
00857 rpki.log.traceback()
00858
00859 self.log("Queue: %r" % self.queue)
00860
00861 if self.queue:
00862 self.restart()
00863
00864
00865
00866 client_queues = {}
00867
00868 def client(msg, url, callback, errback):
00869 """
00870 Open client HTTP connection, send a message, set up callbacks to
00871 handle response.
00872 """
00873
00874 u = urlparse.urlparse(url)
00875
00876 if (u.scheme not in ("", "http") or
00877 u.username is not None or
00878 u.password is not None or
00879 u.params != "" or
00880 u.query != "" or
00881 u.fragment != ""):
00882 raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url
00883
00884 if debug_http:
00885 rpki.log.debug("Contacting %s" % url)
00886
00887 request = http_request(
00888 cmd = "POST",
00889 path = u.path,
00890 body = msg,
00891 callback = callback,
00892 errback = errback,
00893 Host = u.hostname,
00894 Content_Type = rpki_content_type)
00895
00896 hostport = (u.hostname or "localhost", u.port or default_tcp_port)
00897
00898 if debug_http:
00899 rpki.log.debug("Created request %r for %r" % (request, hostport))
00900 if hostport not in client_queues:
00901 client_queues[hostport] = http_queue(hostport)
00902 client_queues[hostport].request(request)
00903
00904
00905
00906
00907 if debug_http:
00908 rpki.log.debug("Scheduling connection startup for %r" % request)
00909 rpki.async.defer(client_queues[hostport].restart)
00910
00911 def server(handlers, port, host = ""):
00912 """
00913 Run an HTTP server and wait (forever) for connections.
00914 """
00915
00916 if not isinstance(handlers, (tuple, list)):
00917 handlers = (("/", handlers),)
00918
00919
00920
00921 ai = []
00922 for af in supported_address_families(enable_ipv6_servers):
00923 try:
00924 if host:
00925 h = host
00926 elif have_ipv6 and af == socket.AF_INET6:
00927 h = "::"
00928 else:
00929 h = "0.0.0.0"
00930 for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
00931 if a not in ai:
00932 ai.append(a)
00933 except socket.gaierror:
00934 pass
00935
00936 for a in ai:
00937 http_listener(addrinfo = a, handlers = handlers)
00938
00939 rpki.async.event_loop()
00940
00941 class caller(object):
00942 """
00943 Handle client-side mechanics for protocols based on HTTP, CMS, and
00944 rpki.xml_utils. Calling sequence is intended to nest within
00945 rpki.async.sync_wrapper.
00946 """
00947
00948 debug = False
00949
00950 def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
00951 self.proto = proto
00952 self.client_key = client_key
00953 self.client_cert = client_cert
00954 self.server_ta = server_ta
00955 self.server_cert = server_cert
00956 self.url = url
00957 if debug is not None:
00958 self.debug = debug
00959
00960 def __call__(self, cb, eb, *pdus):
00961
00962 def done(r_der):
00963 """
00964 Handle CMS-wrapped XML response message.
00965 """
00966 r_cms = self.proto.cms_msg(DER = r_der)
00967 r_msg = r_cms.unwrap((self.server_ta, self.server_cert))
00968 if self.debug:
00969 print "<!-- Reply -->"
00970 print r_cms.pretty_print_content()
00971 cb(r_msg)
00972
00973 q_msg = self.proto.msg.query(*pdus)
00974 q_cms = self.proto.cms_msg()
00975 q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert)
00976 if self.debug:
00977 print "<!-- Query -->"
00978 print q_cms.pretty_print_content()
00979
00980 client(url = self.url, msg = q_der, callback = done, errback = eb)