diff options
-rw-r--r-- | myrpki/myrpki.py | 8 | ||||
-rw-r--r-- | myrpki/myrpki.rng | 2 | ||||
-rw-r--r-- | myrpki/yamltest.py | 2 | ||||
-rw-r--r-- | rpkid/rpki/async.py | 23 | ||||
-rw-r--r-- | rpkid/rpki/cli.py | 29 | ||||
-rw-r--r-- | rpkid/rpki/https.py | 353 | ||||
-rw-r--r-- | rpkid/rpki/left_right.py | 5 | ||||
-rw-r--r-- | rpkid/rpki/sundial.py | 34 | ||||
-rw-r--r-- | scripts/pylint.rc | 2 |
9 files changed, 399 insertions, 59 deletions
diff --git a/myrpki/myrpki.py b/myrpki/myrpki.py index a67ce15e..9dbb4ebd 100644 --- a/myrpki/myrpki.py +++ b/myrpki/myrpki.py @@ -737,7 +737,6 @@ class CA(object): finally: if not filename and os.path.exists(fn): os.unlink(fn) - pass def xcert(self, cert, path_restriction = 0): """ @@ -1325,7 +1324,7 @@ class main(rpki.cli.Cmd): try: import rpki.https, rpki.resource_set, rpki.relaxng, rpki.exceptions - import rpki.left_right, rpki.x509, rpki.async, lxml.etree + import rpki.left_right, rpki.x509, rpki.async if hasattr(warnings, "catch_warnings"): with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) @@ -1409,8 +1408,6 @@ class main(rpki.cli.Cmd): xmlfiles.extend(argv) - my_handle = None - for xmlfile in xmlfiles: # Parse XML file and validate it against our scheme @@ -1419,9 +1416,6 @@ class main(rpki.cli.Cmd): handle = tree.get("handle") - if xmlfile == my_xmlfile: - my_handle = handle - # Update IRDB with parsed resource and roa-request data. cur.execute( diff --git a/myrpki/myrpki.rng b/myrpki/myrpki.rng index dc4f18e6..111b5ed0 100644 --- a/myrpki/myrpki.rng +++ b/myrpki/myrpki.rng @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- - $Id: myrpki.rnc 3105 2010-03-16 22:24:19Z sra $ + $Id: myrpki.rnc 3187 2010-04-12 17:56:03Z sra $ RelaxNG Schema for MyRPKI XML messages. diff --git a/myrpki/yamltest.py b/myrpki/yamltest.py index 08153209..f6a82e09 100644 --- a/myrpki/yamltest.py +++ b/myrpki/yamltest.py @@ -45,7 +45,7 @@ PERFORMANCE OF THIS SOFTWARE. """ -import subprocess, csv, re, os, getopt, sys, base64, yaml, signal, errno, time +import subprocess, re, os, getopt, sys, yaml, signal, time import rpki.resource_set, rpki.sundial, rpki.config, rpki.log, myrpki # Nasty regular expressions for parsing config files. Sadly, while diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index 758a437a..fc6ba663 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -68,15 +68,17 @@ class iterator(object): self.doit() def doit(self): + """ + Implement the iterator protocol: attempt to call the item handler + with the next iteration value, call the termination handler if the + iterator signaled StopIteration. + """ try: self.item_callback(self, self.iterator.next()) except StopIteration: if self.done_callback is not None: self.done_callback() - def ignore(self, ignored): - self() - class timer(object): """ Timer construct for event-driven code. It can be used in either of two ways: @@ -319,10 +321,18 @@ class sync_wrapper(object): self.func = func def cb(self, res = None): + """ + Wrapped code has requested normal termination. Store result, and + exit the event loop. + """ self.res = res raise ExitNow def eb(self, err): + """ + Wrapped code raised an exception. Store exception data, then exit + the event loop. + """ exc_info = sys.exc_info() self.err = exc_info if exc_info[1] is err else err raise ExitNow @@ -330,6 +340,10 @@ class sync_wrapper(object): def __call__(self, *args, **kwargs): def thunk(): + """ + Deferred action to call the wrapped code once event system is + running. + """ try: self.func(self.cb, self.eb, *args, **kwargs) except ExitNow: @@ -365,6 +379,9 @@ class gc_summary(object): self.timer.set(self.interval) def handler(self): + """ + Collect and log GC state for this period, reset timer. + """ rpki.log.debug("gc_summary: Running gc.collect()") gc.collect() rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold) diff --git a/rpkid/rpki/cli.py b/rpkid/rpki/cli.py index 10ace337..a67b40a1 100644 --- a/rpkid/rpki/cli.py +++ b/rpkid/rpki/cli.py @@ -75,10 +75,20 @@ class Cmd(cmd.Cmd): do_quit = do_exit def emptyline(self): + """ + Handle an empty line. cmd module default is to repeat the last + command, which I find to be violation of the principal of least + astonishment, so my preference is that an empty line does nothing. + """ if self.emptyline_repeats_last_command: cmd.Cmd.emptyline(self) def filename_complete(self, text, line, begidx, endidx): + """ + Filename completion handler, with hack to restore what I consider + the normal (bash-like) behavior when one hits the completion key + and there's only one match. + """ result = glob.glob(text + "*") if len(result) == 1: path = result.pop() @@ -88,8 +98,13 @@ class Cmd(cmd.Cmd): result.append(path + " ") return result - def completenames(self, *args): - result = set(cmd.Cmd.completenames(self, *args)) + def completenames(self, text, *ignored): + """ + Command name completion handler, with hack to restore what I + consider the normal (bash-like) behavior when one hits the + completion key and there's only one match. + """ + result = set(cmd.Cmd.completenames(self, text, *ignored)) if len(result) == 1: result.add(result.pop() + " ") return list(result) @@ -101,7 +116,11 @@ class Cmd(cmd.Cmd): """ self.stdout.write(self.help_help.__doc__ + "\n") - def complete_help(self, text, *ignored): + def complete_help(self, *args): + """ + Better completion function for help command arguments. + """ + text = args[0] names = self.get_names() result = [] for prefix in ("do_", "help_"): @@ -111,6 +130,10 @@ class Cmd(cmd.Cmd): if have_readline: def cmdloop_with_history(self): + """ + Better command loop, with history file and tweaked readline + completion delimiters. + """ old_completer_delims = readline.get_completer_delims() if self.histfile is not None: try: diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py index d969fb54..085c5347 100644 --- a/rpkid/rpki/https.py +++ b/rpkid/rpki/https.py @@ -36,30 +36,43 @@ import time, socket, asyncore, asynchat, urlparse, sys import rpki.async, rpki.sundial, rpki.x509, rpki.exceptions, rpki.log import POW +## @var rpki_content_type +# HTTP content type used for all RPKI messages. rpki_content_type = "application/x-rpki" - -# ================================================================ - -# Verbose chatter about HTTP streams +## @var debug_http +# Verbose chatter about HTTP streams. debug_http = False -# Verbose chatter about TLS certificates +## @var debug_tls_certs +# Verbose chatter about TLS certificates. debug_tls_certs = False -# Whether we want persistent HTTP streams, when peer also supports them +## @var want_persistent_client +# Whether we want persistent HTTP client streams, when server also supports them. want_persistent_client = False -want_persistent_server = False -# Default HTTP connection timeouts. Given our druthers, we'd prefer -# that the client close the connection, as this avoids the problem of -# client starting to reuse connection just as server closes it. +## @var want_persistent_server +# Whether we want persistent HTTP server streams, when client also supports them. +want_persistent_server = False +## @var default_client_timeout +# Default HTTP client connection timeout. default_client_timeout = rpki.sundial.timedelta(minutes = 15) + +## @var default_server_timeout +# Default HTTP server connection timeouts. Given our druthers, we'd +# prefer that the client close the connection, as this avoids the +# problem of client starting to reuse connection just as server closes +# it, so this should be longer than the client timeout. default_server_timeout = rpki.sundial.timedelta(minutes = 20) +## @var default_http_version +# Preferred HTTP version. default_http_version = (1, 0) +## @var supported_address_families +# # IP address families to support. Almost all the code is in place for # IPv6, the missing bits are DNS support that would let us figure out # which address family to request, and configuration support to let us @@ -68,13 +81,16 @@ default_http_version = (1, 0) # # Address families on which to listen; first entry is also the default # for opening new connections. - if False: supported_address_families = (socket.AF_INET, socket.AF_INET6) else: supported_address_families = (socket.AF_INET,) + class http_message(object): + """ + Virtual class representing of one HTTP message. + """ software_name = "ISC RPKI library" @@ -85,6 +101,10 @@ class http_message(object): self.normalize_headers() def normalize_headers(self, headers = None): + """ + Clean up (some of) the horrible messes that HTTP allows in its + headers. + """ if headers is None: headers = () if self.headers is None else self.headers.items() translate_underscore = True @@ -104,6 +124,9 @@ class http_message(object): @classmethod def parse_from_wire(cls, headers): + """ + Parse and normalize an incoming HTTP message. + """ self = cls() headers = headers.split("\r\n") self.parse_first_line(*headers.pop(0).split(None, 2)) @@ -115,6 +138,9 @@ class http_message(object): return self def format(self): + """ + Format an outgoing HTTP message. + """ s = self.format_first_line() if self.body is not None: assert isinstance(self.body, str) @@ -130,11 +156,17 @@ class http_message(object): return self.format() def parse_version(self, version): + """ + Parse HTTP version, raise an exception if we can't. + """ if version[:5] != "HTTP/": raise rpki.exceptions.HTTPSBadVersion, "Couldn't parse version %s" % version self.version = tuple(int(i) for i in version[5:].split(".")) def persistent(self): + """ + Figure out whether this HTTP message encourages a persistent connection. + """ c = self.headers.get("Connection") if self.version == (1, 1): return c is None or "close" not in c.lower() @@ -144,6 +176,9 @@ class http_message(object): return False class http_request(http_message): + """ + HTTP request message. + """ def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers): assert cmd == "POST" or body is None @@ -155,15 +190,25 @@ class http_request(http_message): self.retried = False def parse_first_line(self, cmd, path, version): + """ + Parse first line of HTTP request message. + """ self.parse_version(version) self.cmd = cmd self.path = path def format_first_line(self): + """ + Format first line of HTTP request message, and set up the + User-Agent header. + """ self.headers.setdefault("User-Agent", self.software_name) return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) class http_response(http_message): + """ + HTTP response message. + """ def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers): http_message.__init__(self, version = version, body = body, headers = headers) @@ -171,21 +216,34 @@ class http_response(http_message): self.reason = reason def parse_first_line(self, version, code, reason): + """ + Parse first line of HTTP response message. + """ self.parse_version(version) self.code = int(code) self.reason = reason def format_first_line(self): + """ + Format first line of HTTP response message, and set up Date and + Server headers. + """ self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) self.headers.setdefault("Server", self.software_name) return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) def log_method(self, msg, logger = rpki.log.debug): + """ + Logging method used in several different classes. + """ assert isinstance(logger, rpki.log.logger) if debug_http or logger is not rpki.log.debug: logger("%r: %s" % (self, msg)) class http_stream(asynchat.async_chat): + """ + Virtual class representing an HTTP message stream. + """ log = log_method tls = None @@ -199,15 +257,20 @@ class http_stream(asynchat.async_chat): self.restart() def restart(self): + """ + (Re)start HTTP message parser, reset timer. + """ assert not self.buffer self.chunk_handler = None self.set_terminator("\r\n\r\n") - if self.timeout is not None: - self.timer.set(self.timeout) - else: - self.timer.cancel() + self.update_timeout() def update_timeout(self): + """ + Put this stream's timer in known good state: set it to the + stream's timeout value if we're doing timeouts, otherwise clear + it. + """ if self.timeout is not None: self.timer.set(self.timeout) else: @@ -215,17 +278,36 @@ class http_stream(asynchat.async_chat): def collect_incoming_data(self, data): """ - Buffer the data + Buffer incoming data from asynchat. """ self.buffer.append(data) self.update_timeout() def get_buffer(self): + """ + Consume data buffered from asynchat. + """ val = "".join(self.buffer) self.buffer = [] return val def found_terminator(self): + """ + Asynchat reported that it found whatever terminator we set, so + figure out what to do next. This can be messy, because we can be + in any of several different states: + + @li We might be handling chunked HTTP, in which case we have to + initialize the chunk decoder; + + @li We might have found the end of the message body, in which case + we can (finally) process it; or + + @li We might have just gotten to the end of the message headers, + in which case we have to parse them to figure out which of three + separate mechanisms (chunked, content-length, TCP close) is going + to tell us how to find the end of the message body. + """ self.update_timeout() if self.chunk_handler: self.chunk_handler() @@ -243,6 +325,12 @@ class http_stream(asynchat.async_chat): self.handle_no_content_length() def chunk_header(self): + """ + Asynchat just handed us what should be the header of one chunk of + a chunked encoding stream. If this chunk has a body, set the + stream up to read it; otherwise, this is the last chunk, so start + the process of exiting the chunk decoder. + """ n = int(self.get_buffer().partition(";")[0], 16) self.log("Chunk length %s" % n) if n: @@ -253,6 +341,11 @@ class http_stream(asynchat.async_chat): self.chunk_handler = self.chunk_discard_trailer def chunk_body(self): + """ + Asynchat just handed us what should be the body of a chunk of the + body of a chunked message (sic). Save it, and prepare to move on + to the next chunk. + """ self.log("Chunk body") self.msg.body += self.buffer self.buffer = [] @@ -260,12 +353,20 @@ class http_stream(asynchat.async_chat): self.set_terminator("\r\n") def chunk_discard_crlf(self): + """ + Consume the CRLF that terminates a chunk, reinitialize chunk + decoder to be ready for the next chunk. + """ self.log("Chunk CRLF") s = self.get_buffer() assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) self.chunk_handler = self.chunk_header def chunk_discard_trailer(self): + """ + Consume chunk trailer, which should be empty, then (finally!) exit + the chunk decoder and hand complete message off to the application. + """ self.log("Chunk trailer") s = self.get_buffer() assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) @@ -273,10 +374,18 @@ class http_stream(asynchat.async_chat): self.handle_message() def handle_body(self): + """ + Hand normal (not chunked) message off to the application. + """ self.msg.body = self.get_buffer() self.handle_message() def handle_error(self): + """ + Asynchat (or asyncore, or somebody) raised an exception. See + whether it's one we should just pass along, otherwise log a stack + trace and close the stream. + """ etype = sys.exc_info()[0] if etype in (SystemExit, rpki.async.ExitNow): self.log("Caught %s, propagating" % etype.__name__) @@ -288,30 +397,64 @@ class http_stream(asynchat.async_chat): self.close(force = True) def handle_timeout(self): + """ + Inactivity timer expired, close connection with prejudice. + """ self.log("Timeout, closing") self.close(force = True) def handle_close(self): + """ + Wrapper around asynchat connection close handler, so that we can + log the event. + """ self.log("Close event in HTTP stream handler") asynchat.async_chat.handle_close(self) def send(self, data): + """ + TLS replacement for normal asyncore .send() method. Throw an + exception if TLS hasn't been started or if TLS I/O was already in + progress, otherwise hand off to the TLS code. + """ assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write) assert self.tls is not None return self.tls.write(data) def recv(self, buffer_size): + """ + TLS replacement for normal asyncore .recv() method. Throw an + exception if TLS hasn't been started or if TLS I/O was already in + progress, otherwise hand off to the TLS code. + """ assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write) assert self.tls is not None return self.tls.read(buffer_size) def readable(self): + """ + TLS replacement for normal asynchat .readable() method. A TLS + connection that's blocked waiting for TLS write is considered not + readable even if the underlying socket is. + """ return self.retry_read is not None or (self.retry_write is None and asynchat.async_chat.readable(self)) def writeable(self): + """ + TLS replacement for normal asynchat .writeable() method. A TLS + connection that's blocked waiting for TLS read is considered not + writeable even if the underlying socket is. + """ return self.retry_write is not None or (self.retry_read is None and asynchat.async_chat.writeable(self)) def handle_read(self): + """ + Asyncore says socket is readable. Make sure there's no TLS write + already in progress, retry previous read operation if we had one + that was waiting for more input, otherwise try to read some data, + and handle all the weird OpenSSL exceptions that the TLS code + throws. + """ assert self.retry_write is None, "%r: TLS I/O already in progress, w %r" % (self, self.retry_write) if self.retry_read is not None: thunk = self.retry_read @@ -333,7 +476,13 @@ class http_stream(asynchat.async_chat): self.close(force = True) def handle_write(self): - + """ + Asyncore says socket is writeable. Make sure there's no TLS read + already in progress, retry previous write operation if we had one + that was blocked on the socket, otherwise try to write some data. + Handling all the weird OpenSSL exceptions that TLS throws is our + caller's problem. + """ # This used to be an assertion, but apparently this can happen # without anything really being wrong, as a sort of race # condition, due to select() having signaled that a socket was @@ -342,7 +491,6 @@ class http_stream(asynchat.async_chat): if self.retry_read is not None: self.log("TLS I/O already in progress, r %r" % self.retry_read) return - if self.retry_write is not None: thunk = self.retry_write self.retry_write = None @@ -352,6 +500,10 @@ class http_stream(asynchat.async_chat): asynchat.async_chat.handle_write(self) def initiate_send(self): + """ + Initiate a write operation. This is just a wrapper around the + asynchat method, to handle all the whacky TLS exceptions. + """ assert self.retry_read is None and self.retry_write is None, "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write) try: asynchat.async_chat.initiate_send(self) @@ -367,6 +519,14 @@ class http_stream(asynchat.async_chat): self.close(force = True) def close(self, force = False): + """ + Close the stream. + + Graceful shutdown of a TLS connection requires multiple calls to + the underlying TLS code. If the connection should be closed right + now without waiting (perhaps because it's already dead and we're + just cleaning up), call with force = True. + """ self.log("Close requested") assert force or (self.retry_read is None and self.retry_write is None), "%r: TLS I/O already in progress, r %r w %r" % (self, self.retry_read, self.retry_write) if self.tls is not None: @@ -392,13 +552,23 @@ class http_stream(asynchat.async_chat): asynchat.async_chat.close(self) def log_cert(self, tag, x): + """ + Log HTTPS certificates, if certificate debugging is enabled. + """ if debug_tls_certs: rpki.log.debug("%r: HTTPS %s cert %r issuer %s [%s] subject %s [%s]" % (self, tag, x, x.getIssuer(), x.hAKI(), x.getSubject(), x.hSKI())) class http_server(http_stream): + """ + HTTP(S) server stream. + """ + ## @var parse_type + # Stream parser should look for incoming HTTP request messages. parse_type = http_request + ## @var timeout + # Use the default server timeout value set in the module header. timeout = default_server_timeout def __init__(self, sock, handlers, cert = None, key = None, ta = (), dynamic_ta = None): @@ -424,6 +594,19 @@ class http_server(http_stream): self.tls_accept() def tls_accept(self): + """ + Set up TLS for server side connection, handling all the whacky + OpenSSL exceptions from TLS. + + SSLErrorSSLError exceptions are particularly nasty, because all + too often they indicate a certificate lookup failure deep within + the guts of OpenSSL's TLS connection setup logic. Extracting + anything resembling a Python data structure from a handler called + that deep inside the OpenSSL TLS library, while theoretically + possible, runs a high risk of triggering some kind of memory leak + or corruption. So, for now, we just get back a long text string, + which we break up and log but don't attempt to process further. + """ try: self.tls.accept() except POW.WantReadError: @@ -442,6 +625,11 @@ class http_server(http_stream): raise def handle_no_content_length(self): + """ + Handle an incoming message that used neither chunking nor a + Content-Length header (that is: this message will be the last one + in this server stream). No special action required. + """ self.handle_message() def find_handler(self, path): @@ -454,6 +642,12 @@ class http_server(http_stream): return None def handle_message(self): + """ + TLS and HTTP layers managed to deliver a complete HTTP request to + us, figure out what to do with it. Check the command and + Content-Type, look for a handler, and if everything looks right, + pass the message body, path, and a reply callback to the handler. + """ self.log("Received request %s %s" % (self.msg.cmd, self.msg.path)) if not self.msg.persistent(): self.expect_close = True @@ -477,12 +671,24 @@ class http_server(http_stream): self.send_error(code = error[0], reason = error[1]) def send_error(self, code, reason): + """ + Send an error response to this request. + """ self.send_message(code = code, reason = reason) def send_reply(self, code, body): + """ + Send a reply to this request. + """ self.send_message(code = code, body = body) def send_message(self, code, reason = "OK", body = None): + """ + Queue up reply message. If both parties agree that connection is + persistant, and if no error occurred, restart this stream to + listen for next message; otherwise, queue up a close event for + this stream so it will shut down once the reply has been sent. + """ self.log("Sending response %s %s" % (code, reason)) if code >= 400: self.expect_close = True @@ -499,6 +705,9 @@ class http_server(http_stream): self.restart() class http_listener(asyncore.dispatcher): + """ + Listener for incoming HTTP(S) connections. + """ log = log_method @@ -524,6 +733,10 @@ class http_listener(asyncore.dispatcher): self.log("Listening on %r, handlers %r" % ((host, port), handlers)) def handle_accept(self): + """ + Asyncore says we have an incoming connection, spawn an http_server + stream for it and pass along all of our handler and TLS data. + """ self.log("Accepting connection") try: http_server(sock = self.accept()[0], handlers = self.handlers, cert = self.cert, key = self.key, ta = self.ta, dynamic_ta = self.dynamic_ta) @@ -533,6 +746,9 @@ class http_listener(asyncore.dispatcher): self.handle_error() def handle_error(self): + """ + Asyncore signaled an error, pass it along or log it. + """ if sys.exc_info()[0] is SystemExit: self.log("Caught SystemExit, propagating") raise @@ -541,9 +757,16 @@ class http_listener(asyncore.dispatcher): rpki.log.traceback() class http_client(http_stream): + """ + HTTP(S) client stream. + """ + ## @var parse_type + # Stream parser should look for incoming HTTP response messages. parse_type = http_response + ## @var timeout + # Use the default client timeout value set in the module header. timeout = default_client_timeout def __init__(self, queue, hostport, cert = None, key = None, ta = (), af = supported_address_families[0]): @@ -560,6 +783,9 @@ class http_client(http_stream): self.af = af def start(self): + """ + Create socket and request a connection. + """ try: self.create_socket(self.af, socket.SOCK_STREAM) self.connect(self.hostport) @@ -569,6 +795,9 @@ class http_client(http_stream): self.handle_error() def handle_connect(self): + """ + Asyncore says socket has connected, configure TLS junk. + """ self.log("Socket connected") self.tls = POW.Ssl(POW.TLSV1_CLIENT_METHOD) self.log_cert("client", self.cert) @@ -583,6 +812,9 @@ class http_client(http_stream): self.tls_connect() def tls_connect(self): + """ + Initialize client side of TLS. + """ try: self.tls.connect() except POW.WantReadError: @@ -595,13 +827,25 @@ class http_client(http_stream): self.queue.send_request() def set_state(self, state): + """ + Set HTTP client connection state. + """ self.log("State transition %s => %s" % (self.state, state)) self.state = state def handle_no_content_length(self): + """ + Handle response message that used neither chunking nor a + Content-Length header (that is: this message will be the last one + in this server stream). In this case we want to read until we + reach the end of the data stream. + """ self.set_terminator(None) def send_request(self, msg): + """ + Queue up request message and kickstart connection. + """ self.log("Sending request %r" % msg) assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) self.set_state("request-sent") @@ -610,6 +854,15 @@ class http_client(http_stream): self.restart() def handle_message(self): + """ + Handle incoming HTTP response message. Make sure we're in a state + where we expect to see such a message (and allow the mysterious + empty messages that Apache sends during connection close, no idea + what that is supposed to be about). If everybody agrees that the + connection should stay open, put it into an idle state; otherwise, + arrange for the stream to shut down. + """ + self.log("Message received, state %s" % self.state) if not self.msg.persistent(): @@ -637,6 +890,12 @@ class http_client(http_stream): self.queue.return_result(self.msg) def handle_close(self): + """ + Asyncore signaled connection close. If we were waiting for that + to find the end of a response message, process the resulting + message now; if we were waiting for the response to a request we + sent, signal the error. + """ http_stream.handle_close(self) self.log("State %s" % self.state) self.queue.detach(self) @@ -646,12 +905,21 @@ class http_client(http_stream): raise rpki.exceptions.HTTPSClientAborted, "HTTPS request aborted by close event" def handle_timeout(self): + """ + Connection idle timer has expired. If we weren't idle, log that + something bad has happened, then shut down the connection in any + case. + """ if self.state != "idle": self.log("Timeout while in state %s" % self.state) http_stream.handle_timeout(self) self.queue.detach(self) def handle_error(self): + """ + Asyncore says something threw an exception. Log it, then shut + down the connection and pass back the exception. + """ eclass, edata = sys.exc_info()[0:2] self.log("Error on HTTP client connection %r: %s %s" % (self.hostport, eclass, edata), rpki.log.warn) http_stream.handle_error(self) @@ -659,6 +927,11 @@ class http_client(http_stream): self.queue.return_result(edata) class http_queue(object): + """ + Queue of pending HTTP requests for a single destination. This class + is very tightly coupled to http_client; http_client handles the HTTP + stream itself, this class provides a slightly higher-level API. + """ log = log_method @@ -673,15 +946,25 @@ class http_queue(object): self.ta = ta def request(self, *requests): + """ + Append http_request object(s) to this queue. + """ self.log("Adding requests %r" % requests) self.queue.extend(requests) def restart(self): + """ + Send next request for this queue, if we can. This may involve + starting a new http_client stream, reusing an existing idle + stream, or just ignoring this request if there's an active client + stream already; in the last case, handling of the response (or + exception, or timeout) for the query currently in progress will + call this method when it's time to kick out the next query. + """ try: if self.client is None: - client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta) - self.log("Attaching client %r" % client) - self.client = client + self.client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta) + self.log("Attached client %r" % self.client) self.client.start() elif self.client.state == "idle": self.log("Sending request to existing client %r" % self.client) @@ -694,15 +977,30 @@ class http_queue(object): self.return_result(e) def send_request(self): + """ + Kick out the next query in this queue, if any. + """ if self.queue: self.client.send_request(self.queue[0]) - def detach(self, client): - if client is self.client: - self.log("Detaching client %r" % client) + def detach(self, client_): + """ + Detatch a client from this queue. Silently ignores attempting to + detach a client that is not attached to this queue, to simplify + handling of what otherwise would be a nasty set of race + conditions. + """ + if client_ is self.client: + self.log("Detaching client %r" % client_) self.client = None def return_result(self, result): + """ + Client stream has returned a result, which we need to pass along + to the original caller. Result may be either an HTTP response + message or an exception. In either case, once we're done + processing this result, kick off next message in the queue, if any. + """ if not self.queue: self.log("No caller, this should not happen. Dropping result %r" % result) @@ -729,6 +1027,8 @@ class http_queue(object): if self.queue: self.restart() +## @var client_queues +# Map of (host, port) tuples to http_queue objects. client_queues = {} def client(msg, client_key, client_cert, server_ta, url, callback, errback): @@ -785,7 +1085,7 @@ def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dy handlers = (("/", handlers),) if not isinstance(client_ta, (tuple, list)): - server_ta = (client_ta,) + client_ta = (client_ta,) for af in address_families: http_listener(port = port, host = host, handlers = handlers, cert = server_cert, key = server_key, ta = client_ta, dynamic_ta = dynamic_https_trust_anchor, af = af) @@ -824,6 +1124,9 @@ class caller(object): def __call__(self, cb, eb, *pdus): def done(cms): + """ + Handle CMS-wrapped XML response message. + """ result = self.proto.cms_msg.unwrap(cms, (self.server_ta, self.server_cert), pretty_print = self.debug) if self.debug: msg, xml = result diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 1f0920b6..d19f9c45 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -598,7 +598,7 @@ class repository_elt(data_elt): """ pdu.raise_if_error() - def call_pubd(self, callback, errback, q_msg, handlers = {}): + def call_pubd(self, callback, errback, q_msg, handlers = None): """ Send a message to publication daemon and return the response. @@ -619,6 +619,9 @@ class repository_elt(data_elt): if not q_msg: return callback() + if handlers is None: + handlers = {} + for q_pdu in q_msg: rpki.log.info("Sending <%s %r %r> to pubd" % (q_pdu.action, q_pdu.uri, q_pdu.payload)) diff --git a/rpkid/rpki/sundial.py b/rpkid/rpki/sundial.py index 4319a124..fe5623b9 100644 --- a/rpkid/rpki/sundial.py +++ b/rpkid/rpki/sundial.py @@ -160,10 +160,10 @@ class datetime(pydatetime.datetime): """Return the earlier of two timestamps.""" return other if other < self else self - def __add__(x, y): return _cast(pydatetime.datetime.__add__(x, y)) - def __radd__(x, y): return _cast(pydatetime.datetime.__radd__(x, y)) - def __rsub__(x, y): return _cast(pydatetime.datetime.__rsub__(x, y)) - def __sub__(x, y): return _cast(pydatetime.datetime.__sub__(x, y)) + def __add__(self, y): return _cast(pydatetime.datetime.__add__(self, y)) + def __radd__(self, y): return _cast(pydatetime.datetime.__radd__(self, y)) + def __rsub__(self, y): return _cast(pydatetime.datetime.__rsub__(self, y)) + def __sub__(self, y): return _cast(pydatetime.datetime.__sub__(self, y)) class timedelta(pydatetime.timedelta): """ @@ -240,19 +240,19 @@ class timedelta(pydatetime.timedelta): """Convert a datetime.timedelta object into this subclass.""" return cls(days = x.days, seconds = x.seconds, microseconds = x.microseconds) - def __abs__(x): return _cast(pydatetime.timedelta.__abs__(x)) - def __add__(x, y): return _cast(pydatetime.timedelta.__add__(x, y)) - def __div__(x, y): return _cast(pydatetime.timedelta.__div__(x, y)) - def __floordiv__(x, y): return _cast(pydatetime.timedelta.__floordiv__(x, y)) - def __mul__(x, y): return _cast(pydatetime.timedelta.__mul__(x, y)) - def __neg__(x): return _cast(pydatetime.timedelta.__neg__(x)) - def __pos__(x): return _cast(pydatetime.timedelta.__pos__(x)) - def __radd__(x, y): return _cast(pydatetime.timedelta.__radd__(x, y)) - def __rdiv__(x, y): return _cast(pydatetime.timedelta.__rdiv__(x, y)) - def __rfloordiv__(x, y): return _cast(pydatetime.timedelta.__rfloordiv__(x, y)) - def __rmul__(x, y): return _cast(pydatetime.timedelta.__rmul__(x, y)) - def __rsub__(x, y): return _cast(pydatetime.timedelta.__rsub__(x, y)) - def __sub__(x, y): return _cast(pydatetime.timedelta.__sub__(x, y)) + def __abs__(self): return _cast(pydatetime.timedelta.__abs__(self)) + def __add__(self, x): return _cast(pydatetime.timedelta.__add__(self, x)) + def __div__(self, x): return _cast(pydatetime.timedelta.__div__(self, x)) + def __floordiv__(self, x): return _cast(pydatetime.timedelta.__floordiv__(self, x)) + def __mul__(self, x): return _cast(pydatetime.timedelta.__mul__(self, x)) + def __neg__(self): return _cast(pydatetime.timedelta.__neg__(self)) + def __pos__(self): return _cast(pydatetime.timedelta.__pos__(self)) + def __radd__(self, x): return _cast(pydatetime.timedelta.__radd__(self, x)) + def __rdiv__(self, x): return _cast(pydatetime.timedelta.__rdiv__(self, x)) + def __rfloordiv__(self, x): return _cast(pydatetime.timedelta.__rfloordiv__(self, x)) + def __rmul__(self, x): return _cast(pydatetime.timedelta.__rmul__(self, x)) + def __rsub__(self, x): return _cast(pydatetime.timedelta.__rsub__(self, x)) + def __sub__(self, x): return _cast(pydatetime.timedelta.__sub__(self, x)) def _cast(x): """ diff --git a/scripts/pylint.rc b/scripts/pylint.rc index 08f76fbb..5e555f45 100644 --- a/scripts/pylint.rc +++ b/scripts/pylint.rc @@ -32,7 +32,7 @@ disable-msg-cat= #enable-msg= # Disable the message(s) with the given id(s). -disable-msg=R0801,R0903,R0913,C0321,R0904,W0201,E1101,W0614,C0301,R0901,C0302,R0902,R0201,W0613,R0912,R0915,W0703,W0212 +disable-msg=R0801,R0903,R0913,C0321,R0904,W0201,E1101,W0614,C0301,R0901,C0302,R0902,R0201,W0613,R0912,R0915,W0703,W0212,R0914,W0603 [REPORTS] |