RPKI Engine  1.0
http.py (4026)
Go to the documentation of this file.
00001 """
00002 HTTP utilities, both client and server.
00003 
00004 $Id: http.py 4026 2011-10-07 21:43:47Z sra $
00005 
00006 Copyright (C) 2009-2011  Internet Systems Consortium ("ISC")
00007 
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011 
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 
00020 Portions copyright (C) 2007--2008  American Registry for Internet Numbers ("ARIN")
00021 
00022 Permission to use, copy, modify, and distribute this software for any
00023 purpose with or without fee is hereby granted, provided that the above
00024 copyright notice and this permission notice appear in all copies.
00025 
00026 THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
00027 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00028 AND FITNESS.  IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
00029 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00030 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00031 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00032 PERFORMANCE OF THIS SOFTWARE.
00033 """
00034 
00035 import time, socket, asyncore, asynchat, urlparse, sys, random
00036 import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log
00037 import rpki.POW
00038 
00039 ## @var rpki_content_type
00040 # HTTP content type used for all RPKI messages.
00041 rpki_content_type = "application/x-rpki"
00042 
00043 ## @var debug_http
00044 # Verbose chatter about HTTP streams.
00045 debug_http = False
00046 
00047 ## @var want_persistent_client
00048 # Whether we want persistent HTTP client streams, when server also supports them.
00049 want_persistent_client = False
00050 
00051 ## @var want_persistent_server
00052 # Whether we want persistent HTTP server streams, when client also supports them.
00053 want_persistent_server = False
00054 
00055 ## @var default_client_timeout
00056 # Default HTTP client connection timeout.
00057 default_client_timeout = rpki.sundial.timedelta(minutes = 5)
00058 
00059 ## @var default_server_timeout
00060 # Default HTTP server connection timeouts.  Given our druthers, we'd
00061 # prefer that the client close the connection, as this avoids the
00062 # problem of client starting to reuse connection just as server closes
00063 # it, so this should be longer than the client timeout.
00064 default_server_timeout = rpki.sundial.timedelta(minutes = 10)
00065 
00066 ## @var default_http_version
00067 # Preferred HTTP version.
00068 default_http_version = (1, 0)
00069 
00070 ## @var default_tcp_port
00071 # Default port for clients and servers that don't specify one.
00072 default_tcp_port = 80
00073 
00074 ## @var enable_ipv6_servers
00075 # Whether to enable IPv6 listeners.  Enabled by default, as it should
00076 # be harmless.  Has no effect if kernel doesn't support IPv6.
00077 enable_ipv6_servers = True
00078 
00079 ## @var enable_ipv6_clients
00080 # Whether to consider IPv6 addresses when making connections.
00081 # Disabled by default, as IPv6 connectivity is still a bad joke in
00082 # far too much of the world.
00083 enable_ipv6_clients = False
00084 
00085 ## @var use_adns
00086 # Whether to use rpki.adns code.  This is still experimental, so it's
00087 # not (yet) enabled by default.
00088 use_adns = False
00089 
00090 ## @var have_ipv6
00091 # Whether the current machine claims to support IPv6.  Note that just
00092 # because the kernel supports it doesn't mean that the machine has
00093 # usable IPv6 connectivity.  I don't know of a simple portable way to
00094 # probe for connectivity at runtime (the old test of "can you ping
00095 # SRI-NIC.ARPA?" seems a bit dated...).  Don't set this, it's set
00096 # automatically by probing using the socket() system call at runtime.
00097 try:
00098   socket.socket(socket.AF_INET6).close()
00099   socket.IPPROTO_IPV6
00100   socket.IPV6_V6ONLY
00101 except:
00102   have_ipv6 = False
00103 else:
00104   have_ipv6 = True
00105 
00106 def supported_address_families(enable_ipv6):
00107   """
00108   IP address families on which servers should listen, and to consider
00109   when selecting addresses for client connections.
00110   """
00111   if enable_ipv6 and have_ipv6:
00112     return (socket.AF_INET, socket.AF_INET6)
00113   else:
00114     return (socket.AF_INET,)
00115 
00116 def localhost_addrinfo():
00117   """
00118   Return pseudo-getaddrinfo results for localhost.
00119   """
00120   result = [(socket.AF_INET, "127.0.0.1")]
00121   if enable_ipv6_clients and have_ipv6:
00122     result.append((socket.AF_INET6, "::1"))
00123   return result
00124 
00125 class http_message(object):
00126   """
00127   Virtual class representing of one HTTP message.
00128   """
00129 
00130   software_name = "ISC RPKI library"
00131 
00132   def __init__(self, version = None, body = None, headers = None):
00133     self.version = version
00134     self.body = body
00135     self.headers = headers
00136     self.normalize_headers()
00137 
00138   def normalize_headers(self, headers = None):
00139     """
00140     Clean up (some of) the horrible messes that HTTP allows in its
00141     headers.
00142     """
00143     if headers is None:
00144       headers = () if self.headers is None else self.headers.items()
00145       translate_underscore = True
00146     else:
00147       translate_underscore = False
00148     result = {}
00149     for k, v in headers:
00150       if translate_underscore:
00151         k = k.replace("_", "-")
00152       k = "-".join(s.capitalize() for s in k.split("-"))
00153       v = v.strip()
00154       if k in result:
00155         result[k] += ", " + v
00156       else:
00157         result[k] = v
00158     self.headers = result
00159 
00160   @classmethod
00161   def parse_from_wire(cls, headers):
00162     """
00163     Parse and normalize an incoming HTTP message.
00164     """
00165     self = cls()
00166     headers = headers.split("\r\n")
00167     self.parse_first_line(*headers.pop(0).split(None, 2))
00168     for i in xrange(len(headers) - 2, -1, -1):
00169       if headers[i + 1][0].isspace():
00170         headers[i] += headers[i + 1]
00171         del headers[i + 1]
00172     self.normalize_headers([h.split(":", 1) for h in headers])
00173     return self
00174 
00175   def format(self):
00176     """
00177     Format an outgoing HTTP message.
00178     """
00179     s = self.format_first_line()
00180     if self.body is not None:
00181       assert isinstance(self.body, str)
00182       self.headers["Content-Length"] = len(self.body)
00183     for kv in self.headers.iteritems():
00184       s += "%s: %s\r\n" % kv
00185     s += "\r\n"
00186     if self.body is not None:
00187       s += self.body
00188     return s
00189 
00190   def __str__(self):
00191     return self.format()
00192 
00193   def parse_version(self, version):
00194     """
00195     Parse HTTP version, raise an exception if we can't.
00196     """
00197     if version[:5] != "HTTP/":
00198       raise rpki.exceptions.HTTPBadVersion, "Couldn't parse version %s" % version
00199     self.version = tuple(int(i) for i in version[5:].split("."))
00200 
00201   @property
00202   def persistent(self):
00203     """
00204     Figure out whether this HTTP message encourages a persistent connection.
00205     """
00206     c = self.headers.get("Connection")
00207     if self.version == (1, 1):
00208       return c is None or "close" not in c.lower()
00209     elif self.version == (1, 0):
00210       return c is not None and "keep-alive" in c.lower()
00211     else:
00212       return False
00213 
00214 class http_request(http_message):
00215   """
00216   HTTP request message.
00217   """
00218 
00219   def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
00220     assert cmd == "POST" or body is None
00221     http_message.__init__(self, version = version, body = body, headers = headers)
00222     self.cmd = cmd
00223     self.path = path
00224     self.callback = callback
00225     self.errback = errback
00226     self.retried = False
00227 
00228   def parse_first_line(self, cmd, path, version):
00229     """
00230     Parse first line of HTTP request message.
00231     """
00232     self.parse_version(version)
00233     self.cmd = cmd
00234     self.path = path
00235 
00236   def format_first_line(self):
00237     """
00238     Format first line of HTTP request message, and set up the
00239     User-Agent header.
00240     """
00241     self.headers.setdefault("User-Agent", self.software_name)
00242     return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
00243 
00244   def __repr__(self):
00245     return rpki.log.log_repr(self, self.cmd, self.path)
00246             
00247 class http_response(http_message):
00248   """
00249   HTTP response message.
00250   """
00251 
00252   def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
00253     http_message.__init__(self, version = version, body = body, headers = headers)
00254     self.code = code
00255     self.reason = reason
00256 
00257   def parse_first_line(self, version, code, reason):
00258     """
00259     Parse first line of HTTP response message.
00260     """
00261     self.parse_version(version)
00262     self.code = int(code)
00263     self.reason = reason
00264 
00265   def format_first_line(self):
00266     """
00267     Format first line of HTTP response message, and set up Date and
00268     Server headers.
00269     """
00270     self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
00271     self.headers.setdefault("Server", self.software_name)
00272     return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
00273 
00274   def __repr__(self):
00275     return rpki.log.log_repr(self, self.code, self.reason)
00276 
00277 def log_method(self, msg, logger = rpki.log.debug):
00278   """
00279   Logging method used in several different classes.
00280   """
00281   assert isinstance(logger, rpki.log.logger)
00282   if debug_http or logger is not rpki.log.debug:
00283     logger("%r: %s" % (self, msg))
00284 
00285 def addr_to_string(addr):
00286   """
00287   Convert socket addr tuple to printable string.  Assumes 2-element
00288   tuple is IPv4, 4-element tuple is IPv6, throws TypeError for
00289   anything else.
00290   """
00291 
00292   if len(addr) == 2:
00293     return "%s:%d" % (addr[0], addr[1])
00294   if len(addr) == 4:
00295     return "%s.%d" % (addr[0], addr[1])
00296   raise TypeError
00297 
00298 class http_stream(asynchat.async_chat):
00299   """
00300   Virtual class representing an HTTP message stream.
00301   """
00302 
00303   log = log_method
00304 
00305   def __repr__(self):
00306     status = ["connected"] if self.connected else []
00307     try:
00308       status.append(addr_to_string(self.addr))
00309     except TypeError:
00310       pass
00311     return rpki.log.log_repr(self, *status)
00312 
00313   def __init__(self, sock = None):
00314     asynchat.async_chat.__init__(self, sock)
00315     self.buffer = []
00316     self.timer = rpki.async.timer(self.handle_timeout)
00317     self.restart()
00318 
00319   def restart(self):
00320     """
00321     (Re)start HTTP message parser, reset timer.
00322     """
00323     assert not self.buffer
00324     self.chunk_handler = None
00325     self.set_terminator("\r\n\r\n")
00326     self.update_timeout()
00327 
00328   def update_timeout(self):
00329     """
00330     Put this stream's timer in known good state: set it to the
00331     stream's timeout value if we're doing timeouts, otherwise clear
00332     it.
00333     """
00334     if self.timeout is not None:
00335       self.log("Setting timeout %s" % self.timeout)
00336       self.timer.set(self.timeout)
00337     else:
00338       self.log("Clearing timeout")
00339       self.timer.cancel()
00340 
00341   def collect_incoming_data(self, data):
00342     """
00343     Buffer incoming data from asynchat.
00344     """
00345     self.buffer.append(data)
00346     self.update_timeout()
00347 
00348   def get_buffer(self):
00349     """
00350     Consume data buffered from asynchat.
00351     """
00352     val = "".join(self.buffer)
00353     self.buffer = []
00354     return val
00355 
00356   def found_terminator(self):
00357     """
00358     Asynchat reported that it found whatever terminator we set, so
00359     figure out what to do next.  This can be messy, because we can be
00360     in any of several different states:
00361 
00362     @li We might be handling chunked HTTP, in which case we have to
00363     initialize the chunk decoder;
00364 
00365     @li We might have found the end of the message body, in which case
00366     we can (finally) process it; or
00367 
00368     @li We might have just gotten to the end of the message headers,
00369     in which case we have to parse them to figure out which of three
00370     separate mechanisms (chunked, content-length, TCP close) is going
00371     to tell us how to find the end of the message body.
00372     """
00373     self.update_timeout()
00374     if self.chunk_handler:
00375       self.chunk_handler()
00376     elif not isinstance(self.get_terminator(), str):
00377       self.handle_body()
00378     else:
00379       self.msg = self.parse_type.parse_from_wire(self.get_buffer())
00380       if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
00381         self.msg.body = []
00382         self.chunk_handler = self.chunk_header
00383         self.set_terminator("\r\n")
00384       elif "Content-Length" in self.msg.headers:
00385         self.set_terminator(int(self.msg.headers["Content-Length"]))
00386       else:
00387         self.handle_no_content_length()
00388       
00389   def chunk_header(self):
00390     """
00391     Asynchat just handed us what should be the header of one chunk of
00392     a chunked encoding stream.  If this chunk has a body, set the
00393     stream up to read it; otherwise, this is the last chunk, so start
00394     the process of exiting the chunk decoder.
00395     """
00396     n = int(self.get_buffer().partition(";")[0], 16)
00397     self.log("Chunk length %s" % n)
00398     if n:
00399       self.chunk_handler = self.chunk_body
00400       self.set_terminator(n)
00401     else:
00402       self.msg.body = "".join(self.msg.body)
00403       self.chunk_handler = self.chunk_discard_trailer
00404 
00405   def chunk_body(self):
00406     """
00407     Asynchat just handed us what should be the body of a chunk of the
00408     body of a chunked message (sic).  Save it, and prepare to move on
00409     to the next chunk.
00410     """
00411     self.log("Chunk body")
00412     self.msg.body += self.buffer
00413     self.buffer = []
00414     self.chunk_handler = self.chunk_discard_crlf
00415     self.set_terminator("\r\n")
00416 
00417   def chunk_discard_crlf(self):
00418     """
00419     Consume the CRLF that terminates a chunk, reinitialize chunk
00420     decoder to be ready for the next chunk.
00421     """
00422     self.log("Chunk CRLF")
00423     s = self.get_buffer()
00424     assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
00425     self.chunk_handler = self.chunk_header
00426 
00427   def chunk_discard_trailer(self):
00428     """
00429     Consume chunk trailer, which should be empty, then (finally!) exit
00430     the chunk decoder and hand complete message off to the application.
00431     """
00432     self.log("Chunk trailer")
00433     s = self.get_buffer()
00434     assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
00435     self.chunk_handler = None
00436     self.handle_message()
00437 
00438   def handle_body(self):
00439     """
00440     Hand normal (not chunked) message off to the application.
00441     """
00442     self.msg.body = self.get_buffer()
00443     self.handle_message()
00444 
00445   def handle_error(self):
00446     """
00447     Asynchat (or asyncore, or somebody) raised an exception.  See
00448     whether it's one we should just pass along, otherwise log a stack
00449     trace and close the stream.
00450     """
00451     self.timer.cancel()
00452     etype = sys.exc_info()[0]
00453     if etype in (SystemExit, rpki.async.ExitNow):
00454       raise
00455     rpki.log.traceback()
00456     if etype is not rpki.exceptions.HTTPClientAborted:
00457       self.log("Closing due to error", rpki.log.warn)
00458       self.close()
00459 
00460   def handle_timeout(self):
00461     """
00462     Inactivity timer expired, close connection with prejudice.
00463     """
00464     self.log("Timeout, closing")
00465     self.close()
00466 
00467   def handle_close(self):
00468     """
00469     Wrapper around asynchat connection close handler, so that we can
00470     log the event, cancel timer, and so forth.
00471     """
00472     self.log("Close event in HTTP stream handler")
00473     self.timer.cancel()
00474     asynchat.async_chat.handle_close(self)
00475 
00476 class http_server(http_stream):
00477   """
00478   HTTP server stream.
00479   """
00480 
00481   ## @var parse_type
00482   # Stream parser should look for incoming HTTP request messages.
00483   parse_type = http_request
00484 
00485   ## @var timeout
00486   # Use the default server timeout value set in the module header.
00487   timeout = default_server_timeout
00488 
00489   def __init__(self, sock, handlers):
00490     self.handlers = handlers
00491     http_stream.__init__(self, sock = sock)
00492     self.expect_close = not want_persistent_server
00493     self.log("Starting")
00494 
00495   def handle_no_content_length(self):
00496     """
00497     Handle an incoming message that used neither chunking nor a
00498     Content-Length header (that is: this message will be the last one
00499     in this server stream).  No special action required.
00500     """
00501     self.handle_message()
00502 
00503   def find_handler(self, path):
00504     """
00505     Helper method to search self.handlers.
00506     """
00507     for s, h in self.handlers:
00508       if path.startswith(s):
00509         return h
00510     return None
00511 
00512   def handle_message(self):
00513     """
00514     HTTP layer managed to deliver a complete HTTP request to
00515     us, figure out what to do with it.  Check the command and
00516     Content-Type, look for a handler, and if everything looks right,
00517     pass the message body, path, and a reply callback to the handler.
00518     """
00519     self.log("Received request %r" % self.msg)
00520     if not self.msg.persistent:
00521       self.expect_close = True
00522     handler = self.find_handler(self.msg.path)
00523     error = None
00524     if self.msg.cmd != "POST":
00525       error = 501, "No handler for method %s" % self.msg.cmd
00526     elif self.msg.headers["Content-Type"] != rpki_content_type:
00527       error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
00528     elif handler is None:
00529       error = 404, "No handler for URL %s" % self.msg.path
00530     if error is None:
00531       try:
00532         handler(self.msg.body, self.msg.path, self.send_reply)
00533       except (rpki.async.ExitNow, SystemExit):
00534         raise
00535       except Exception, e:
00536         rpki.log.traceback()
00537         self.send_error(500, "Unhandled exception %s" % e)
00538     else:
00539       self.send_error(code = error[0], reason = error[1])
00540 
00541   def send_error(self, code, reason):
00542     """
00543     Send an error response to this request.
00544     """
00545     self.send_message(code = code, reason = reason)
00546 
00547   def send_reply(self, code, body = None, reason = "OK"):
00548     """
00549     Send a reply to this request.
00550     """
00551     self.send_message(code = code, body = body, reason = reason)
00552 
00553   def send_message(self, code, reason = "OK", body = None):
00554     """
00555     Queue up reply message.  If both parties agree that connection is
00556     persistant, and if no error occurred, restart this stream to
00557     listen for next message; otherwise, queue up a close event for
00558     this stream so it will shut down once the reply has been sent.
00559     """
00560     self.log("Sending response %s %s" % (code, reason))
00561     if code >= 400:
00562       self.expect_close = True
00563     msg = http_response(code = code, reason = reason, body = body,
00564                         Content_Type = rpki_content_type,
00565                         Connection = "Close" if self.expect_close else "Keep-Alive")
00566     self.push(msg.format())
00567     if self.expect_close:
00568       self.log("Closing")
00569       self.timer.cancel()
00570       self.close_when_done()
00571     else:      
00572       self.log("Listening for next message")
00573       self.restart()
00574 
00575 class http_listener(asyncore.dispatcher):
00576   """
00577   Listener for incoming HTTP connections.
00578   """
00579 
00580   log = log_method
00581 
00582   def __repr__(self):
00583     try:
00584       status = (addr_to_string(self.addr),)
00585     except TypeError:
00586       status = ()
00587     return rpki.log.log_repr(self, *status)
00588 
00589   def __init__(self, handlers, addrinfo):
00590     asyncore.dispatcher.__init__(self)
00591     self.handlers = handlers
00592     try:
00593       af, socktype, proto, canonname, sockaddr = addrinfo
00594       self.create_socket(af, socktype)
00595       self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00596       try:
00597         self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00598       except AttributeError:
00599         pass
00600       if have_ipv6 and af == socket.AF_INET6:
00601         self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
00602       self.bind(sockaddr)
00603       self.listen(5)
00604     except Exception, e:
00605       self.log("Couldn't set up HTTP listener: %s" % e, rpki.log.warn)
00606       rpki.log.traceback()
00607       self.close()
00608     for h in handlers:
00609       self.log("Handling %s" % h[0])
00610 
00611   def handle_accept(self):
00612     """
00613     Asyncore says we have an incoming connection, spawn an http_server
00614     stream for it and pass along all of our handler data.
00615     """
00616     try:
00617       s, client = self.accept()
00618       self.log("Accepting connection from %s" % addr_to_string(client))
00619       http_server(sock = s, handlers = self.handlers)
00620     except (rpki.async.ExitNow, SystemExit):
00621       raise
00622     except Exception, e:
00623       self.log("Unable to accept connection: %s" % e)
00624       self.handle_error()
00625 
00626   def handle_error(self):
00627     """
00628     Asyncore signaled an error, pass it along or log it.
00629     """
00630     if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
00631       raise
00632     self.log("Error in HTTP listener", rpki.log.warn)
00633     rpki.log.traceback()
00634 
00635 class http_client(http_stream):
00636   """
00637   HTTP client stream.
00638   """
00639 
00640   ## @var parse_type
00641   # Stream parser should look for incoming HTTP response messages.
00642   parse_type = http_response
00643 
00644   ## @var timeout
00645   # Use the default client timeout value set in the module header.
00646   timeout = default_client_timeout
00647 
00648   ## @var state
00649   # Application layer connection state.
00650   state = None
00651 
00652   def __init__(self, queue, hostport):
00653     self.log("Creating new connection to %s" % addr_to_string(hostport))
00654     http_stream.__init__(self)
00655     self.queue = queue
00656     self.host = hostport[0]
00657     self.port = hostport[1]
00658     self.set_state("opening")
00659     self.expect_close = not want_persistent_client
00660 
00661   def start(self):
00662     """
00663     Create socket and request a connection.
00664     """
00665     if not use_adns:
00666       self.log("Not using ADNS")
00667       self.gotaddrinfo([(socket.AF_INET, self.host)])
00668     elif self.host == "localhost":
00669       self.log("Bypassing DNS for localhost")
00670       self.gotaddrinfo(localhost_addrinfo())
00671     else:
00672       import rpki.adns                  # This should move to start of file once we've decided to inflict it on all users
00673       families = supported_address_families(enable_ipv6_clients)
00674       self.log("Starting ADNS lookup for %s in families %r" % (self.host, families))
00675       rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families)
00676 
00677   def dns_error(self, e):
00678     """
00679     Handle DNS lookup errors.  For now, just whack the connection.
00680     Undoubtedly we should do something better with diagnostics here.
00681     """
00682     self.handle_error()
00683 
00684   def gotaddrinfo(self, addrinfo):
00685     """
00686     Got address data from DNS, create socket and request connection.
00687     """
00688     try:
00689       self.af, self.address = random.choice(addrinfo)
00690       self.log("Connecting to AF %s host %s port %s addr %s" % (self.af, self.host, self.port, self.address))
00691       self.create_socket(self.af, socket.SOCK_STREAM)
00692       self.connect((self.address, self.port))
00693       if self.addr is None:
00694         self.addr = (self.host, self.port)
00695       self.update_timeout()
00696     except (rpki.async.ExitNow, SystemExit):
00697       raise
00698     except Exception:
00699       self.handle_error()
00700 
00701   def handle_connect(self):
00702     """
00703     Asyncore says socket has connected.
00704     """
00705     self.log("Socket connected")
00706     self.set_state("idle")
00707     assert self.queue.client is self
00708     self.queue.send_request()
00709 
00710   def set_state(self, state):
00711     """
00712     Set HTTP client connection state.
00713     """
00714     self.log("State transition %s => %s" % (self.state, state))
00715     self.state = state
00716 
00717   def handle_no_content_length(self):
00718     """
00719     Handle response message that used neither chunking nor a
00720     Content-Length header (that is: this message will be the last one
00721     in this server stream).  In this case we want to read until we
00722     reach the end of the data stream.
00723     """
00724     self.set_terminator(None)
00725 
00726   def send_request(self, msg):
00727     """
00728     Queue up request message and kickstart connection.
00729     """
00730     self.log("Sending request %r" % msg)
00731     assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
00732     self.set_state("request-sent")
00733     msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
00734     self.push(msg.format())
00735     self.restart()
00736 
00737   def handle_message(self):
00738     """
00739     Handle incoming HTTP response message.  Make sure we're in a state
00740     where we expect to see such a message (and allow the mysterious
00741     empty messages that Apache sends during connection close, no idea
00742     what that is supposed to be about).  If everybody agrees that the
00743     connection should stay open, put it into an idle state; otherwise,
00744     arrange for the stream to shut down.
00745     """
00746 
00747     self.log("Message received, state %s" % self.state)
00748 
00749     if not self.msg.persistent:
00750       self.expect_close = True
00751 
00752     if self.state != "request-sent":
00753       if self.state == "closing":
00754         assert not self.msg.body
00755         self.log("Ignoring empty response received while closing")
00756         return
00757       raise rpki.exceptions.HTTPUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state)
00758 
00759     if self.expect_close:
00760       self.log("Closing")
00761       self.set_state("closing")
00762       self.close_when_done()
00763     else:
00764       self.log("Idling")
00765       self.set_state("idle")
00766       self.update_timeout()
00767 
00768     if self.msg.code != 200:
00769       raise rpki.exceptions.HTTPRequestFailed, "HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)
00770     self.queue.return_result(self, self.msg, detach = self.expect_close)
00771 
00772   def handle_close(self):
00773     """
00774     Asyncore signaled connection close.  If we were waiting for that
00775     to find the end of a response message, process the resulting
00776     message now; if we were waiting for the response to a request we
00777     sent, signal the error.
00778     """
00779     http_stream.handle_close(self)
00780     self.log("State %s" % self.state)
00781     if self.get_terminator() is None:
00782       self.handle_body()
00783     elif self.state == "request-sent":
00784       raise rpki.exceptions.HTTPClientAborted, "HTTP request aborted by close event"
00785     else:
00786       self.queue.detach(self)
00787 
00788   def handle_timeout(self):
00789     """
00790     Connection idle timer has expired.  Shut down connection in any
00791     case, noisily if we weren't idle.
00792     """
00793     bad = self.state not in ("idle", "closing")
00794     if bad:
00795       self.log("Timeout while in state %s" % self.state, rpki.log.warn)
00796     http_stream.handle_timeout(self)
00797     if bad:
00798       try:
00799         raise rpki.exceptions.HTTPTimeout
00800       except:
00801         self.handle_error()
00802     else:
00803       self.queue.detach(self)
00804 
00805   def handle_error(self):
00806     """
00807     Asyncore says something threw an exception.  Log it, then shut
00808     down the connection and pass back the exception.
00809     """
00810     eclass, edata = sys.exc_info()[0:2]
00811     self.log("Error on HTTP client connection %s:%s %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn)
00812     http_stream.handle_error(self)
00813     self.queue.return_result(self, edata, detach = True)
00814 
00815 class http_queue(object):
00816   """
00817   Queue of pending HTTP requests for a single destination.  This class
00818   is very tightly coupled to http_client; http_client handles the HTTP
00819   stream itself, this class provides a slightly higher-level API.
00820   """
00821 
00822   log = log_method
00823 
00824   def __repr__(self):
00825     return rpki.log.log_repr(self, "%s" % addr_to_string(self.hostport))
00826 
00827   def __init__(self, hostport):
00828     self.hostport = hostport
00829     self.client = None
00830     self.log("Created")
00831     self.queue = []
00832 
00833   def request(self, *requests):
00834     """
00835     Append http_request object(s) to this queue.
00836     """
00837     self.log("Adding requests %r" % requests)
00838     self.queue.extend(requests)
00839 
00840   def restart(self):
00841     """
00842     Send next request for this queue, if we can.  This may involve
00843     starting a new http_client stream, reusing an existing idle
00844     stream, or just ignoring this request if there's an active client
00845     stream already; in the last case, handling of the response (or
00846     exception, or timeout) for the query currently in progress will
00847     call this method when it's time to kick out the next query.
00848     """
00849     try:
00850       if self.client is None:
00851         self.client = http_client(self, self.hostport)
00852         self.log("Attached client %r" % self.client)
00853         self.client.start()
00854       elif self.client.state == "idle":
00855         self.log("Sending request to existing client %r" % self.client)
00856         self.send_request()
00857       else:
00858         self.log("Client %r exists in state %r" % (self.client, self.client.state))
00859     except (rpki.async.ExitNow, SystemExit):
00860       raise
00861     except Exception, e:
00862       self.return_result(self.client, e, detach = True)
00863 
00864   def send_request(self):
00865     """
00866     Kick out the next query in this queue, if any.
00867     """
00868     if self.queue:
00869       self.client.send_request(self.queue[0])
00870 
00871   def detach(self, client_):
00872     """
00873     Detatch a client from this queue.  Silently ignores attempting to
00874     detach a client that is not attached to this queue, to simplify
00875     handling of what otherwise would be a nasty set of race
00876     conditions.
00877     """
00878     if client_ is self.client:
00879       self.log("Detaching client %r" % client_)
00880       self.client = None
00881 
00882   def return_result(self, client, result, detach = False):
00883     """
00884     Client stream has returned a result, which we need to pass along
00885     to the original caller.  Result may be either an HTTP response
00886     message or an exception.  In either case, once we're done
00887     processing this result, kick off next message in the queue, if any.
00888     """
00889 
00890     if client is not self.client:
00891       self.log("Wrong client trying to return result.  THIS SHOULD NOT HAPPEN.  Dropping result %r" % result, rpki.log.warn)
00892       return
00893 
00894     if detach:
00895       self.detach(client)
00896 
00897     try:
00898       req = self.queue.pop(0)
00899       self.log("Dequeuing request %r" % req)
00900     except IndexError:
00901       self.log("No caller.  THIS SHOULD NOT HAPPEN.  Dropping result %r" % result, rpki.log.warn)
00902       return
00903 
00904     assert isinstance(result, http_response) or isinstance(result, Exception)
00905 
00906     if isinstance(result, http_response):
00907       try:
00908         self.log("Returning result %r to caller" % result)
00909         req.callback(result.body)
00910       except (rpki.async.ExitNow, SystemExit):
00911         raise
00912       except Exception, e:
00913         result = e
00914 
00915     if isinstance(result, Exception):
00916       try:
00917         self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn)
00918         req.errback(result)
00919       except (rpki.async.ExitNow, SystemExit):
00920         raise
00921       except Exception:
00922         #
00923         # If we get here, we may have lost the event chain.  Not
00924         # obvious what we can do about it at this point, but force a
00925         # traceback so that it will be somewhat obvious that something
00926         # really bad happened.
00927         #
00928         self.log("Exception in exception callback", rpki.log.warn)
00929         rpki.log.traceback(True)
00930 
00931     self.log("Queue: %r" % self.queue)
00932 
00933     if self.queue:
00934       self.restart()
00935 
00936 ## @var client_queues
00937 # Map of (host, port) tuples to http_queue objects.
00938 client_queues = {}
00939 
00940 def client(msg, url, callback, errback):
00941   """
00942   Open client HTTP connection, send a message, set up callbacks to
00943   handle response.
00944   """
00945 
00946   u = urlparse.urlparse(url)
00947 
00948   if (u.scheme not in ("", "http") or
00949       u.username is not None or
00950       u.password is not None or
00951       u.params   != "" or
00952       u.query    != "" or
00953       u.fragment != ""):
00954     raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url
00955 
00956   if debug_http:
00957     rpki.log.debug("Contacting %s" % url)
00958 
00959   request = http_request(
00960     cmd                 = "POST",
00961     path                = u.path,
00962     body                = msg,
00963     callback            = callback,
00964     errback             = errback,
00965     Host                = u.hostname,
00966     Content_Type        = rpki_content_type)
00967 
00968   hostport = (u.hostname or "localhost", u.port or default_tcp_port)
00969 
00970   if debug_http:
00971     rpki.log.debug("Created request %r for %s" % (request, addr_to_string(hostport)))
00972   if hostport not in client_queues:
00973     client_queues[hostport] = http_queue(hostport)
00974   client_queues[hostport].request(request)
00975 
00976   # Defer connection attempt until after we've had time to process any
00977   # pending I/O events, in case connections have closed.
00978 
00979   if debug_http:
00980     rpki.log.debug("Scheduling connection startup for %r" % request)
00981   rpki.async.defer(client_queues[hostport].restart)
00982 
00983 def server(handlers, port, host = ""):
00984   """
00985   Run an HTTP server and wait (forever) for connections.
00986   """
00987 
00988   if not isinstance(handlers, (tuple, list)):
00989     handlers = (("/", handlers),)
00990 
00991   # Yes, this is sick.  So is getaddrinfo() returning duplicate
00992   # records, which RedHat has the gall to claim is a feature.
00993   ai = []
00994   for af in supported_address_families(enable_ipv6_servers):
00995     try:
00996       if host:
00997         h = host
00998       elif have_ipv6 and af == socket.AF_INET6:
00999         h = "::"
01000       else:
01001         h = "0.0.0.0"
01002       for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
01003         if a not in ai:
01004           ai.append(a)
01005     except socket.gaierror:
01006       pass
01007 
01008   for a in ai:
01009     http_listener(addrinfo = a, handlers = handlers)
01010 
01011   rpki.async.event_loop()
01012 
01013 class caller(object):
01014   """
01015   Handle client-side mechanics for protocols based on HTTP, CMS, and
01016   rpki.xml_utils.  Calling sequence is intended to nest within
01017   rpki.async.sync_wrapper.
01018   """
01019 
01020   debug = False
01021 
01022   def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
01023     self.proto = proto
01024     self.client_key = client_key
01025     self.client_cert = client_cert
01026     self.server_ta = server_ta
01027     self.server_cert = server_cert
01028     self.url = url
01029     if debug is not None:
01030       self.debug = debug
01031 
01032   def __call__(self, cb, eb, *pdus):
01033 
01034     def done(r_der):
01035       """
01036       Handle CMS-wrapped XML response message.
01037       """
01038       r_cms = self.proto.cms_msg(DER = r_der)
01039       r_msg = r_cms.unwrap((self.server_ta, self.server_cert))
01040       if self.debug:
01041         print "<!-- Reply -->"
01042         print r_cms.pretty_print_content()
01043       cb(r_msg)
01044 
01045     q_msg = self.proto.msg.query(*pdus)
01046     q_cms = self.proto.cms_msg()
01047     q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert)
01048     if self.debug:
01049       print "<!-- Query -->"
01050       print q_cms.pretty_print_content()
01051 
01052     client(url = self.url, msg = q_der, callback = done, errback = eb)
 All Classes Namespaces Files Functions Variables Properties