RPKI Engine 1.0

http.py (3793)

Go to the documentation of this file.
00001 """
00002 HTTP utilities, both client and server.
00003 
00004 $Id: http.py 3793 2011-04-27 04:34:52Z sra $
00005 
00006 Copyright (C) 2009-2011  Internet Systems Consortium ("ISC")
00007 
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011 
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 
00020 Portions copyright (C) 2007--2008  American Registry for Internet Numbers ("ARIN")
00021 
00022 Permission to use, copy, modify, and distribute this software for any
00023 purpose with or without fee is hereby granted, provided that the above
00024 copyright notice and this permission notice appear in all copies.
00025 
00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00028 AND FITNESS.  IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00032 PERFORMANCE OF THIS SOFTWARE.
00033 """
00034 
00035 import time, socket, asyncore, asynchat, urlparse, sys, random
00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log
00037 import rpki.POW
00038 
00039 ## @var rpki_content_type
00040 # HTTP content type used for all RPKI messages.
00041 rpki_content_type = "application/x-rpki"
00042 
00043 ## @var debug_http
00044 # Verbose chatter about HTTP streams.
00045 debug_http = False
00046 
00047 ## @var want_persistent_client
00048 # Whether we want persistent HTTP client streams, when server also supports them.
00049 want_persistent_client = False
00050 
00051 ## @var want_persistent_server
00052 # Whether we want persistent HTTP server streams, when client also supports them.
00053 want_persistent_server = False
00054 
00055 ## @var default_client_timeout
00056 # Default HTTP client connection timeout.
00057 default_client_timeout = rpki.sundial.timedelta(minutes = 15)
00058 
00059 ## @var default_server_timeout
00060 # Default HTTP server connection timeouts.  Given our druthers, we'd
00061 # prefer that the client close the connection, as this avoids the
00062 # problem of client starting to reuse connection just as server closes
00063 # it, so this should be longer than the client timeout.
00064 default_server_timeout = rpki.sundial.timedelta(minutes = 20)
00065 
00066 ## @var default_http_version
00067 # Preferred HTTP version.
00068 default_http_version = (1, 0)
00069 
00070 ## @var default_tcp_port
00071 # Default port for clients and servers that don't specify one.
00072 default_tcp_port = 80
00073 
00074 ## @var enable_ipv6_servers
00075 # Whether to enable IPv6 listeners.  Enabled by default, as it should
00076 # be harmless.  Has no effect if kernel doesn't support IPv6.
00077 enable_ipv6_servers = True
00078 
00079 ## @var enable_ipv6_clients
00080 # Whether to consider IPv6 addresses when making connections.
00081 # Disabled by default, as IPv6 connectivity is still a bad joke in
00082 # far too much of the world.
00083 enable_ipv6_clients = False
00084 
00085 ## @var use_adns
00086 # Whether to use rpki.adns code.  This is still experimental, so it's
00087 # not (yet) enabled by default.
00088 use_adns = False
00089 
00090 ## @var have_ipv6
00091 # Whether the current machine claims to support IPv6.  Note that just
00092 # because the kernel supports it doesn't mean that the machine has
00093 # usable IPv6 connectivity.  I don't know of a simple portable way to
00094 # probe for connectivity at runtime (the old test of "can you ping
00095 # SRI-NIC.ARPA?" seems a bit dated...).  Don't set this, it's set
00096 # automatically by probing using the socket() system call at runtime.
00097 try:
00098   socket.socket(socket.AF_INET6).close()
00099   socket.IPPROTO_IPV6
00100   socket.IPV6_V6ONLY
00101 except:
00102   have_ipv6 = False
00103 else:
00104   have_ipv6 = True
00105 
00106 def supported_address_families(enable_ipv6):
00107   """
00108   IP address families on which servers should listen, and to consider
00109   when selecting addresses for client connections.
00110   """
00111   if enable_ipv6 and have_ipv6:
00112     return (socket.AF_INET, socket.AF_INET6)
00113   else:
00114     return (socket.AF_INET,)
00115 
00116 def localhost_addrinfo():
00117   """
00118   Return pseudo-getaddrinfo results for localhost.
00119   """
00120   result = [(socket.AF_INET, "127.0.0.1")]
00121   if enable_ipv6_clients and have_ipv6:
00122     result.append((socket.AF_INET6, "::1"))
00123   return result
00124 
00125 class http_message(object):
00126   """
00127   Virtual class representing of one HTTP message.
00128   """
00129 
00130   software_name = "ISC RPKI library"
00131 
00132   def __init__(self, version = None, body = None, headers = None):
00133     self.version = version
00134     self.body = body
00135     self.headers = headers
00136     self.normalize_headers()
00137 
00138   def normalize_headers(self, headers = None):
00139     """
00140     Clean up (some of) the horrible messes that HTTP allows in its
00141     headers.
00142     """
00143     if headers is None:
00144       headers = () if self.headers is None else self.headers.items()
00145       translate_underscore = True
00146     else:
00147       translate_underscore = False
00148     result = {}
00149     for k, v in headers:
00150       if translate_underscore:
00151         k = k.replace("_", "-")
00152       k = "-".join(s.capitalize() for s in k.split("-"))
00153       v = v.strip()
00154       if k in result:
00155         result[k] += ", " + v
00156       else:
00157         result[k] = v
00158     self.headers = result
00159 
00160   @classmethod
00161   def parse_from_wire(cls, headers):
00162     """
00163     Parse and normalize an incoming HTTP message.
00164     """
00165     self = cls()
00166     headers = headers.split("\r\n")
00167     self.parse_first_line(*headers.pop(0).split(None, 2))
00168     for i in xrange(len(headers) - 2, -1, -1):
00169       if headers[i + 1][0].isspace():
00170         headers[i] += headers[i + 1]
00171         del headers[i + 1]
00172     self.normalize_headers([h.split(":", 1) for h in headers])
00173     return self
00174 
00175   def format(self):
00176     """
00177     Format an outgoing HTTP message.
00178     """
00179     s = self.format_first_line()
00180     if self.body is not None:
00181       assert isinstance(self.body, str)
00182       self.headers["Content-Length"] = len(self.body)
00183     for kv in self.headers.iteritems():
00184       s += "%s: %s\r\n" % kv
00185     s += "\r\n"
00186     if self.body is not None:
00187       s += self.body
00188     return s
00189 
00190   def __str__(self):
00191     return self.format()
00192 
00193   def parse_version(self, version):
00194     """
00195     Parse HTTP version, raise an exception if we can't.
00196     """
00197     if version[:5] != "HTTP/":
00198       raise rpki.exceptions.HTTPBadVersion, "Couldn't parse version %s" % version
00199     self.version = tuple(int(i) for i in version[5:].split("."))
00200 
00201   @property
00202   def persistent(self):
00203     """
00204     Figure out whether this HTTP message encourages a persistent connection.
00205     """
00206     c = self.headers.get("Connection")
00207     if self.version == (1, 1):
00208       return c is None or "close" not in c.lower()
00209     elif self.version == (1, 0):
00210       return c is not None and "keep-alive" in c.lower()
00211     else:
00212       return False
00213 
00214 class http_request(http_message):
00215   """
00216   HTTP request message.
00217   """
00218 
00219   def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
00220     assert cmd == "POST" or body is None
00221     http_message.__init__(self, version = version, body = body, headers = headers)
00222     self.cmd = cmd
00223     self.path = path
00224     self.callback = callback
00225     self.errback = errback
00226     self.retried = False
00227 
00228   def parse_first_line(self, cmd, path, version):
00229     """
00230     Parse first line of HTTP request message.
00231     """
00232     self.parse_version(version)
00233     self.cmd = cmd
00234     self.path = path
00235 
00236   def format_first_line(self):
00237     """
00238     Format first line of HTTP request message, and set up the
00239     User-Agent header.
00240     """
00241     self.headers.setdefault("User-Agent", self.software_name)
00242     return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
00243 
00244   def __repr__(self):
00245     return rpki.log.log_repr(self, self.cmd, self.path)
00246             
00247 class http_response(http_message):
00248   """
00249   HTTP response message.
00250   """
00251 
00252   def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
00253     http_message.__init__(self, version = version, body = body, headers = headers)
00254     self.code = code
00255     self.reason = reason
00256 
00257   def parse_first_line(self, version, code, reason):
00258     """
00259     Parse first line of HTTP response message.
00260     """
00261     self.parse_version(version)
00262     self.code = int(code)
00263     self.reason = reason
00264 
00265   def format_first_line(self):
00266     """
00267     Format first line of HTTP response message, and set up Date and
00268     Server headers.
00269     """
00270     self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
00271     self.headers.setdefault("Server", self.software_name)
00272     return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
00273 
00274   def __repr__(self):
00275     return rpki.log.log_repr(self, self.code, self.reason)
00276 
00277 def log_method(self, msg, logger = rpki.log.debug):
00278   """
00279   Logging method used in several different classes.
00280   """
00281   assert isinstance(logger, rpki.log.logger)
00282   if debug_http or logger is not rpki.log.debug:
00283     logger("%r: %s" % (self, msg))
00284 
00285 def addr_to_string(addr):
00286   """
00287   Convert socket addr tuple to printable string.  Assumes 2-element
00288   tuple is IPv4, 4-element tuple is IPv6, throws TypeError for
00289   anything else.
00290   """
00291 
00292   if len(addr) == 2:
00293     return "%s:%d" % (addr[0], addr[1])
00294   if len(addr) == 4:
00295     return "%s.%d" % (addr[0], addr[1])
00296   raise TypeError
00297 
00298 class http_stream(asynchat.async_chat):
00299   """
00300   Virtual class representing an HTTP message stream.
00301   """
00302 
00303   log = log_method
00304   show_tracebacks = False
00305 
00306   def __repr__(self):
00307     status = ["connected"] if self.connected else []
00308     try:
00309       status.append(addr_to_string(self.addr))
00310     except TypeError:
00311       pass
00312     return rpki.log.log_repr(self, *status)
00313 
00314   def __init__(self, sock = None):
00315     asynchat.async_chat.__init__(self, sock)
00316     self.buffer = []
00317     self.timer = rpki.async.timer(self.handle_timeout)
00318     self.restart()
00319 
00320   def restart(self):
00321     """
00322     (Re)start HTTP message parser, reset timer.
00323     """
00324     assert not self.buffer
00325     self.chunk_handler = None
00326     self.set_terminator("\r\n\r\n")
00327     self.update_timeout()
00328 
00329   def update_timeout(self):
00330     """
00331     Put this stream's timer in known good state: set it to the
00332     stream's timeout value if we're doing timeouts, otherwise clear
00333     it.
00334     """
00335     if self.timeout is not None:
00336       self.log("Setting timeout %s" % self.timeout)
00337       self.timer.set(self.timeout)
00338     else:
00339       self.log("Clearing timeout")
00340       self.timer.cancel()
00341 
00342   def collect_incoming_data(self, data):
00343     """
00344     Buffer incoming data from asynchat.
00345     """
00346     self.buffer.append(data)
00347     self.update_timeout()
00348 
00349   def get_buffer(self):
00350     """
00351     Consume data buffered from asynchat.
00352     """
00353     val = "".join(self.buffer)
00354     self.buffer = []
00355     return val
00356 
00357   def found_terminator(self):
00358     """
00359     Asynchat reported that it found whatever terminator we set, so
00360     figure out what to do next.  This can be messy, because we can be
00361     in any of several different states:
00362 
00363     @li We might be handling chunked HTTP, in which case we have to
00364     initialize the chunk decoder;
00365 
00366     @li We might have found the end of the message body, in which case
00367     we can (finally) process it; or
00368 
00369     @li We might have just gotten to the end of the message headers,
00370     in which case we have to parse them to figure out which of three
00371     separate mechanisms (chunked, content-length, TCP close) is going
00372     to tell us how to find the end of the message body.
00373     """
00374     self.update_timeout()
00375     if self.chunk_handler:
00376       self.chunk_handler()
00377     elif not isinstance(self.get_terminator(), str):
00378       self.handle_body()
00379     else:
00380       self.msg = self.parse_type.parse_from_wire(self.get_buffer())
00381       if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
00382         self.msg.body = []
00383         self.chunk_handler = self.chunk_header
00384         self.set_terminator("\r\n")
00385       elif "Content-Length" in self.msg.headers:
00386         self.set_terminator(int(self.msg.headers["Content-Length"]))
00387       else:
00388         self.handle_no_content_length()
00389       
00390   def chunk_header(self):
00391     """
00392     Asynchat just handed us what should be the header of one chunk of
00393     a chunked encoding stream.  If this chunk has a body, set the
00394     stream up to read it; otherwise, this is the last chunk, so start
00395     the process of exiting the chunk decoder.
00396     """
00397     n = int(self.get_buffer().partition(";")[0], 16)
00398     self.log("Chunk length %s" % n)
00399     if n:
00400       self.chunk_handler = self.chunk_body
00401       self.set_terminator(n)
00402     else:
00403       self.msg.body = "".join(self.msg.body)
00404       self.chunk_handler = self.chunk_discard_trailer
00405 
00406   def chunk_body(self):
00407     """
00408     Asynchat just handed us what should be the body of a chunk of the
00409     body of a chunked message (sic).  Save it, and prepare to move on
00410     to the next chunk.
00411     """
00412     self.log("Chunk body")
00413     self.msg.body += self.buffer
00414     self.buffer = []
00415     self.chunk_handler = self.chunk_discard_crlf
00416     self.set_terminator("\r\n")
00417 
00418   def chunk_discard_crlf(self):
00419     """
00420     Consume the CRLF that terminates a chunk, reinitialize chunk
00421     decoder to be ready for the next chunk.
00422     """
00423     self.log("Chunk CRLF")
00424     s = self.get_buffer()
00425     assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
00426     self.chunk_handler = self.chunk_header
00427 
00428   def chunk_discard_trailer(self):
00429     """
00430     Consume chunk trailer, which should be empty, then (finally!) exit
00431     the chunk decoder and hand complete message off to the application.
00432     """
00433     self.log("Chunk trailer")
00434     s = self.get_buffer()
00435     assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
00436     self.chunk_handler = None
00437     self.handle_message()
00438 
00439   def handle_body(self):
00440     """
00441     Hand normal (not chunked) message off to the application.
00442     """
00443     self.msg.body = self.get_buffer()
00444     self.handle_message()
00445 
00446   def handle_error(self):
00447     """
00448     Asynchat (or asyncore, or somebody) raised an exception.  See
00449     whether it's one we should just pass along, otherwise log a stack
00450     trace and close the stream.
00451     """
00452     self.timer.cancel()
00453     etype = sys.exc_info()[0]
00454     if etype in (SystemExit, rpki.async.ExitNow):
00455       raise
00456     if self.show_tracebacks:
00457       self.log("Error in HTTP stream handler", rpki.log.warn)
00458       rpki.log.traceback()
00459     if etype is not rpki.exceptions.HTTPClientAborted:
00460       self.log("Closing due to error", rpki.log.warn)
00461       self.close()
00462 
00463   def handle_timeout(self):
00464     """
00465     Inactivity timer expired, close connection with prejudice.
00466     """
00467     self.log("Timeout, closing")
00468     self.close()
00469 
00470   def handle_close(self):
00471     """
00472     Wrapper around asynchat connection close handler, so that we can
00473     log the event, cancel timer, and so forth.
00474     """
00475     self.log("Close event in HTTP stream handler")
00476     self.timer.cancel()
00477     asynchat.async_chat.handle_close(self)
00478 
00479 class http_server(http_stream):
00480   """
00481   HTTP server stream.
00482   """
00483 
00484   ## @var parse_type
00485   # Stream parser should look for incoming HTTP request messages.
00486   parse_type = http_request
00487 
00488   ## @var timeout
00489   # Use the default server timeout value set in the module header.
00490   timeout = default_server_timeout
00491 
00492   def __init__(self, sock, handlers):
00493     self.handlers = handlers
00494     http_stream.__init__(self, sock = sock)
00495     self.expect_close = not want_persistent_server
00496     self.log("Starting")
00497 
00498   def handle_no_content_length(self):
00499     """
00500     Handle an incoming message that used neither chunking nor a
00501     Content-Length header (that is: this message will be the last one
00502     in this server stream).  No special action required.
00503     """
00504     self.handle_message()
00505 
00506   def find_handler(self, path):
00507     """
00508     Helper method to search self.handlers.
00509     """
00510     for s, h in self.handlers:
00511       if path.startswith(s):
00512         return h
00513     return None
00514 
00515   def handle_message(self):
00516     """
00517     HTTP layer managed to deliver a complete HTTP request to
00518     us, figure out what to do with it.  Check the command and
00519     Content-Type, look for a handler, and if everything looks right,
00520     pass the message body, path, and a reply callback to the handler.
00521     """
00522     self.log("Received request %r" % self.msg)
00523     if not self.msg.persistent:
00524       self.expect_close = True
00525     handler = self.find_handler(self.msg.path)
00526     error = None
00527     if self.msg.cmd != "POST":
00528       error = 501, "No handler for method %s" % self.msg.cmd
00529     elif self.msg.headers["Content-Type"] != rpki_content_type:
00530       error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
00531     elif handler is None:
00532       error = 404, "No handler for URL %s" % self.msg.path
00533     if error is None:
00534       try:
00535         handler(self.msg.body, self.msg.path, self.send_reply)
00536       except (rpki.async.ExitNow, SystemExit):
00537         raise
00538       except Exception, e:
00539         if self.show_tracebacks:
00540           rpki.log.traceback()
00541         self.send_error(500, "Unhandled exception %s" % e)
00542     else:
00543       self.send_error(code = error[0], reason = error[1])
00544 
00545   def send_error(self, code, reason):
00546     """
00547     Send an error response to this request.
00548     """
00549     self.send_message(code = code, reason = reason)
00550 
00551   def send_reply(self, code, body = None, reason = "OK"):
00552     """
00553     Send a reply to this request.
00554     """
00555     self.send_message(code = code, body = body, reason = reason)
00556 
00557   def send_message(self, code, reason = "OK", body = None):
00558     """
00559     Queue up reply message.  If both parties agree that connection is
00560     persistant, and if no error occurred, restart this stream to
00561     listen for next message; otherwise, queue up a close event for
00562     this stream so it will shut down once the reply has been sent.
00563     """
00564     self.log("Sending response %s %s" % (code, reason))
00565     if code >= 400:
00566       self.expect_close = True
00567     msg = http_response(code = code, reason = reason, body = body,
00568                         Content_Type = rpki_content_type,
00569                         Connection = "Close" if self.expect_close else "Keep-Alive")
00570     self.push(msg.format())
00571     if self.expect_close:
00572       self.log("Closing")
00573       self.timer.cancel()
00574       self.close_when_done()
00575     else:      
00576       self.log("Listening for next message")
00577       self.restart()
00578 
00579 class http_listener(asyncore.dispatcher):
00580   """
00581   Listener for incoming HTTP connections.
00582   """
00583 
00584   log = log_method
00585   show_tracebacks = False
00586 
00587   def __repr__(self):
00588     try:
00589       status = (addr_to_string(self.addr),)
00590     except TypeError:
00591       status = ()
00592     return rpki.log.log_repr(self, *status)
00593 
00594   def __init__(self, handlers, addrinfo):
00595     asyncore.dispatcher.__init__(self)
00596     self.handlers = handlers
00597     try:
00598       af, socktype, proto, canonname, sockaddr = addrinfo
00599       self.create_socket(af, socktype)
00600       self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00601       try:
00602         self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00603       except AttributeError:
00604         pass
00605       if have_ipv6 and af == socket.AF_INET6:
00606         self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
00607       self.bind(sockaddr)
00608       self.listen(5)
00609     except:
00610       self.log("Couldn't set up HTTP listener", rpki.log.warn)
00611       if self.show_tracebacks:
00612         rpki.log.traceback()
00613       self.close()
00614     for h in handlers:
00615       self.log("Handling %s" % h[0])
00616 
00617   def handle_accept(self):
00618     """
00619     Asyncore says we have an incoming connection, spawn an http_server
00620     stream for it and pass along all of our handler data.
00621     """
00622     try:
00623       s, client = self.accept()
00624       self.log("Accepting connection from %s" % addr_to_string(client))
00625       http_server(sock = s, handlers = self.handlers)
00626     except (rpki.async.ExitNow, SystemExit):
00627       raise
00628     except:
00629       self.log("Unable to accept connection")
00630       self.handle_error()
00631 
00632   def handle_error(self):
00633     """
00634     Asyncore signaled an error, pass it along or log it.
00635     """
00636     if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
00637       raise
00638     self.log("Error in HTTP listener", rpki.log.warn)
00639     if self.show_tracebacks:
00640       rpki.log.traceback()
00641 
00642 class http_client(http_stream):
00643   """
00644   HTTP client stream.
00645   """
00646 
00647   ## @var parse_type
00648   # Stream parser should look for incoming HTTP response messages.
00649   parse_type = http_response
00650 
00651   ## @var timeout
00652   # Use the default client timeout value set in the module header.
00653   timeout = default_client_timeout
00654 
00655   ## @var state
00656   # Application layer connection state.
00657   state = None
00658 
00659   def __init__(self, queue, hostport):
00660     self.log("Creating new connection to %s" % addr_to_string(hostport))
00661     http_stream.__init__(self)
00662     self.queue = queue
00663     self.host = hostport[0]
00664     self.port = hostport[1]
00665     self.set_state("opening")
00666     self.expect_close = not want_persistent_client
00667 
00668   def start(self):
00669     """
00670     Create socket and request a connection.
00671     """
00672     if not use_adns:
00673       self.log("Not using ADNS")
00674       self.gotaddrinfo([(socket.AF_INET, self.host)])
00675     elif self.host == "localhost":
00676       self.log("Bypassing DNS for localhost")
00677       self.gotaddrinfo(localhost_addrinfo())
00678     else:
00679       import rpki.adns                  # This should move to start of file once we've decided to inflict it on all users
00680       families = supported_address_families(enable_ipv6_clients)
00681       self.log("Starting ADNS lookup for %s in families %r" % (self.host, families))
00682       rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families)
00683 
00684   def dns_error(self, e):
00685     """
00686     Handle DNS lookup errors.  For now, just whack the connection.
00687     Undoubtedly we should do something better with diagnostics here.
00688     """
00689     self.handle_error()
00690 
00691   def gotaddrinfo(self, addrinfo):
00692     """
00693     Got address data from DNS, create socket and request connection.
00694     """
00695     try:
00696       self.af, self.address = random.choice(addrinfo)
00697       self.log("Connecting to AF %s host %s port %s addr %s" % (self.af, self.host, self.port, self.address))
00698       self.create_socket(self.af, socket.SOCK_STREAM)
00699       self.connect((self.address, self.port))
00700       if self.addr is None:
00701         self.addr = (self.host, self.port)
00702     except (rpki.async.ExitNow, SystemExit):
00703       raise
00704     except:
00705       self.handle_error()
00706 
00707   def handle_connect(self):
00708     """
00709     Asyncore says socket has connected.
00710     """
00711     self.log("Socket connected")
00712     self.set_state("idle")
00713     assert self.queue.client is self
00714     self.queue.send_request()
00715 
00716   def set_state(self, state):
00717     """
00718     Set HTTP client connection state.
00719     """
00720     self.log("State transition %s => %s" % (self.state, state))
00721     self.state = state
00722 
00723   def handle_no_content_length(self):
00724     """
00725     Handle response message that used neither chunking nor a
00726     Content-Length header (that is: this message will be the last one
00727     in this server stream).  In this case we want to read until we
00728     reach the end of the data stream.
00729     """
00730     self.set_terminator(None)
00731 
00732   def send_request(self, msg):
00733     """
00734     Queue up request message and kickstart connection.
00735     """
00736     self.log("Sending request %r" % msg)
00737     assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
00738     self.set_state("request-sent")
00739     msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
00740     self.push(msg.format())
00741     self.restart()
00742 
00743   def handle_message(self):
00744     """
00745     Handle incoming HTTP response message.  Make sure we're in a state
00746     where we expect to see such a message (and allow the mysterious
00747     empty messages that Apache sends during connection close, no idea
00748     what that is supposed to be about).  If everybody agrees that the
00749     connection should stay open, put it into an idle state; otherwise,
00750     arrange for the stream to shut down.
00751     """
00752 
00753     self.log("Message received, state %s" % self.state)
00754 
00755     if not self.msg.persistent:
00756       self.expect_close = True
00757 
00758     if self.state != "request-sent":
00759       if self.state == "closing":
00760         assert not self.msg.body
00761         self.log("Ignoring empty response received while closing")
00762         return
00763       raise rpki.exceptions.HTTPUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state)
00764 
00765     if self.expect_close:
00766       self.log("Closing")
00767       self.set_state("closing")
00768       self.close_when_done()
00769     else:
00770       self.log("Idling")
00771       self.set_state("idle")
00772       self.update_timeout()
00773 
00774     if self.msg.code != 200:
00775       raise rpki.exceptions.HTTPRequestFailed, "HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)
00776     self.queue.return_result(self, self.msg, detach = self.expect_close)
00777 
00778   def handle_close(self):
00779     """
00780     Asyncore signaled connection close.  If we were waiting for that
00781     to find the end of a response message, process the resulting
00782     message now; if we were waiting for the response to a request we
00783     sent, signal the error.
00784     """
00785     http_stream.handle_close(self)
00786     self.log("State %s" % self.state)
00787     if self.get_terminator() is None:
00788       self.handle_body()
00789     elif self.state == "request-sent":
00790       raise rpki.exceptions.HTTPClientAborted, "HTTP request aborted by close event"
00791     else:
00792       self.queue.detach(self)
00793 
00794   def handle_timeout(self):
00795     """
00796     Connection idle timer has expired.  Shut down connection in any
00797     case, noisily if we weren't idle.
00798     """
00799     bad = self.state not in ("idle", "closing")
00800     if bad:
00801       self.log("Timeout while in state %s" % self.state, rpki.log.warn)
00802     http_stream.handle_timeout(self)
00803     if bad:
00804       raise rpki.exceptions.HTTPTimeout
00805     else:
00806       self.queue.detach(self)
00807 
00808   def handle_error(self):
00809     """
00810     Asyncore says something threw an exception.  Log it, then shut
00811     down the connection and pass back the exception.
00812     """
00813     eclass, edata = sys.exc_info()[0:2]
00814     self.log("Error on HTTP client connection %s:%s %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn)
00815     http_stream.handle_error(self)
00816     self.queue.return_result(self, edata, detach = True)
00817 
00818 class http_queue(object):
00819   """
00820   Queue of pending HTTP requests for a single destination.  This class
00821   is very tightly coupled to http_client; http_client handles the HTTP
00822   stream itself, this class provides a slightly higher-level API.
00823   """
00824 
00825   log = log_method
00826 
00827   def __repr__(self):
00828     return rpki.log.log_repr(self, "%s" % addr_to_string(self.hostport))
00829 
00830   def __init__(self, hostport):
00831     self.hostport = hostport
00832     self.client = None
00833     self.log("Created")
00834     self.queue = []
00835 
00836   def request(self, *requests):
00837     """
00838     Append http_request object(s) to this queue.
00839     """
00840     self.log("Adding requests %r" % requests)
00841     self.queue.extend(requests)
00842 
00843   def restart(self):
00844     """
00845     Send next request for this queue, if we can.  This may involve
00846     starting a new http_client stream, reusing an existing idle
00847     stream, or just ignoring this request if there's an active client
00848     stream already; in the last case, handling of the response (or
00849     exception, or timeout) for the query currently in progress will
00850     call this method when it's time to kick out the next query.
00851     """
00852     try:
00853       if self.client is None:
00854         self.client = http_client(self, self.hostport)
00855         self.log("Attached client %r" % self.client)
00856         self.client.start()
00857       elif self.client.state == "idle":
00858         self.log("Sending request to existing client %r" % self.client)
00859         self.send_request()
00860       else:
00861         self.log("Client %r exists in state %r" % (self.client, self.client.state))
00862     except (rpki.async.ExitNow, SystemExit):
00863       raise
00864     except Exception, e:
00865       self.return_result(self.client, e, detach = True)
00866 
00867   def send_request(self):
00868     """
00869     Kick out the next query in this queue, if any.
00870     """
00871     if self.queue:
00872       self.client.send_request(self.queue[0])
00873 
00874   def detach(self, client_):
00875     """
00876     Detatch a client from this queue.  Silently ignores attempting to
00877     detach a client that is not attached to this queue, to simplify
00878     handling of what otherwise would be a nasty set of race
00879     conditions.
00880     """
00881     if client_ is self.client:
00882       self.log("Detaching client %r" % client_)
00883       self.client = None
00884 
00885   def return_result(self, client, result, detach = False):
00886     """
00887     Client stream has returned a result, which we need to pass along
00888     to the original caller.  Result may be either an HTTP response
00889     message or an exception.  In either case, once we're done
00890     processing this result, kick off next message in the queue, if any.
00891     """
00892 
00893     if client is not self.client:
00894       self.log("Wrong client trying to return result.  THIS SHOULD NOT HAPPEN.  Dropping result %r" % result, rpki.log.warn)
00895       return
00896 
00897     if detach:
00898       self.detach(client)
00899 
00900     try:
00901       req = self.queue.pop(0)
00902       self.log("Dequeuing request %r" % req)
00903     except IndexError:
00904       self.log("No caller.  THIS SHOULD NOT HAPPEN.  Dropping result %r" % result, rpki.log.warn)
00905       return
00906 
00907     try:
00908       if isinstance(result, http_response):
00909         self.log("Returning result %r to caller" % result)
00910         req.callback(result.body)
00911       else:
00912         assert isinstance(result, Exception)
00913         self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn)
00914         req.errback(result)
00915     except (rpki.async.ExitNow, SystemExit):
00916       raise
00917     except:
00918       self.log("Unhandled exception from callback")
00919       rpki.log.traceback()
00920 
00921     self.log("Queue: %r" % self.queue)
00922 
00923     if self.queue:
00924       self.restart()
00925 
00926 ## @var client_queues
00927 # Map of (host, port) tuples to http_queue objects.
00928 client_queues = {}
00929 
00930 def client(msg, url, callback, errback):
00931   """
00932   Open client HTTP connection, send a message, set up callbacks to
00933   handle response.
00934   """
00935 
00936   u = urlparse.urlparse(url)
00937 
00938   if (u.scheme not in ("", "http") or
00939       u.username is not None or
00940       u.password is not None or
00941       u.params   != "" or
00942       u.query    != "" or
00943       u.fragment != ""):
00944     raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url
00945 
00946   if debug_http:
00947     rpki.log.debug("Contacting %s" % url)
00948 
00949   request = http_request(
00950     cmd                 = "POST",
00951     path                = u.path,
00952     body                = msg,
00953     callback            = callback,
00954     errback             = errback,
00955     Host                = u.hostname,
00956     Content_Type        = rpki_content_type)
00957 
00958   hostport = (u.hostname or "localhost", u.port or default_tcp_port)
00959 
00960   if debug_http:
00961     rpki.log.debug("Created request %r for %s" % (request, addr_to_string(hostport)))
00962   if hostport not in client_queues:
00963     client_queues[hostport] = http_queue(hostport)
00964   client_queues[hostport].request(request)
00965 
00966   # Defer connection attempt until after we've had time to process any
00967   # pending I/O events, in case connections have closed.
00968 
00969   if debug_http:
00970     rpki.log.debug("Scheduling connection startup for %r" % request)
00971   rpki.async.defer(client_queues[hostport].restart)
00972 
00973 def server(handlers, port, host = ""):
00974   """
00975   Run an HTTP server and wait (forever) for connections.
00976   """
00977 
00978   if not isinstance(handlers, (tuple, list)):
00979     handlers = (("/", handlers),)
00980 
00981   # Yes, this is sick.  So is getaddrinfo() returning duplicate
00982   # records, which RedHat has the gall to claim is a feature.
00983   ai = []
00984   for af in supported_address_families(enable_ipv6_servers):
00985     try:
00986       if host:
00987         h = host
00988       elif have_ipv6 and af == socket.AF_INET6:
00989         h = "::"
00990       else:
00991         h = "0.0.0.0"
00992       for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
00993         if a not in ai:
00994           ai.append(a)
00995     except socket.gaierror:
00996       pass
00997 
00998   for a in ai:
00999     http_listener(addrinfo = a, handlers = handlers)
01000 
01001   rpki.async.event_loop()
01002 
01003 class caller(object):
01004   """
01005   Handle client-side mechanics for protocols based on HTTP, CMS, and
01006   rpki.xml_utils.  Calling sequence is intended to nest within
01007   rpki.async.sync_wrapper.
01008   """
01009 
01010   debug = False
01011 
01012   def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
01013     self.proto = proto
01014     self.client_key = client_key
01015     self.client_cert = client_cert
01016     self.server_ta = server_ta
01017     self.server_cert = server_cert
01018     self.url = url
01019     if debug is not None:
01020       self.debug = debug
01021 
01022   def __call__(self, cb, eb, *pdus):
01023 
01024     def done(r_der):
01025       """
01026       Handle CMS-wrapped XML response message.
01027       """
01028       r_cms = self.proto.cms_msg(DER = r_der)
01029       r_msg = r_cms.unwrap((self.server_ta, self.server_cert))
01030       if self.debug:
01031         print "<!-- Reply -->"
01032         print r_cms.pretty_print_content()
01033       cb(r_msg)
01034 
01035     q_msg = self.proto.msg.query(*pdus)
01036     q_cms = self.proto.cms_msg()
01037     q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert)
01038     if self.debug:
01039       print "<!-- Query -->"
01040       print q_cms.pretty_print_content()
01041 
01042     client(url = self.url, msg = q_der, callback = done, errback = eb)
 All Classes Namespaces Files Functions Variables