diff options
author | Rob Austein <sra@hactrn.net> | 2016-02-20 01:55:08 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-02-20 01:55:08 +0000 |
commit | 2d94f2e729e777cc05ba565fb48df6d057759aab (patch) | |
tree | 8829fb9f25607697ce6946c6027e3d0086b3f813 /rpki/rpkid.py | |
parent | cb402fb8ed1cf963182c384eb6649dfc673a7832 (diff) |
Apparently we have to set the timeout in the HTTPRequest object.
svn path=/branches/tk705/; revision=6275
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r-- | rpki/rpkid.py | 76 |
1 files changed, 67 insertions, 9 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index a82195b3..a518797c 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -137,7 +137,7 @@ class main(object): self.http_server_host = self.cfg.get("server-host", "") self.http_server_port = self.cfg.getint("server-port") - self.http_client_timeout = self.cfg.getint("http-client-timeout", 300) + self.http_client_timeout = self.cfg.getint("http-client-timeout", 900) self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) @@ -337,9 +337,19 @@ class main(object): http_client = tornado.httpclient.AsyncHTTPClient() with (yield lock.acquire()): - response = yield http_client.fetch(request, - connect_timeout = self.http_client_timeout, - request_timeout = self.http_client_timeout) + try: + started = time.time() + response = yield http_client.fetch(request) + except tornado.httpclient.HTTPError as e: + + # XXX This is not a solution, just an attempt to + # gather data on whether the timeout arguments are + # working as expected. + + logger.warning("%r: HTTP error contacting %r: %s", self, request, e) + if e.code == 599: + logger.warning("%r: HTTP timeout after time %s seconds", self, time.time() - started) + raise raise tornado.gen.Return(response) @@ -363,10 +373,12 @@ class main(object): q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) http_request = tornado.httpclient.HTTPRequest( - url = self.irdb_url, - method = "POST", - body = q_der, - headers = { "Content-Type" : rpki.left_right.content_type }) + url = self.irdb_url, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.left_right.content_type }, + connect_timeout = self.http_client_timeout, + request_timeout = self.http_client_timeout) http_response = yield self.http_fetch(http_request) @@ -743,12 +755,58 @@ class publication_queue(object): def call_pubd(self): for rid in self.repositories: logger.debug("Calling pubd[%r]", self.repositories[rid]) - yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) + try: + yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) + except (rpki.exceptions.ExistingObjectAtURI, rpki.exceptions.DifferentObjectAtURI, rpki.exceptions.NoObjectAtURI) as e: + logger.warn("Lost synchronization with %r: %s", self.repositories[rid], e) + yield self.resync(self.repositories[rid]) for k in self.uris.iterkeys(): if self._inplay.get(k) is self: del self._inplay[k] self.clear() + @tornado.gen.coroutine + def resync(self, repository): + logger.info("Attempting resynchronization with %r", repository) + + # A lot of this is copy and paste from .serve_publish_world_now(). + # Refactor when we have more of a clue about how this should work. + + q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + SubElement(q_msg, rpki.publication.tag_list, tag = "list") + r_msg = yield repository.call_pubd(self.rpkid, q_msg, length_check = False) + + if not all(r_pdu.tag == rpki.publication.tag_list for r_pdu in r_msg): + raise rpki.exceptions.BadPublicationReply("Unexpected XML tag in publication response") + + pubd_objs = dict((r_pdu.get("uri"), r_pdu.get("hash")) for r_pdu in r_msg) + + our_objs = [] + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = repository.tenant, state = "active"): + our_objs = [(ca_detail.crl_uri, ca_detail.latest_crl), + (ca_detail.manifest_uri, ca_detail.latest_manifest)] + our_objs.extend((c.uri, c.cert) for c in ca_detail.child_certs.all()) + our_objs.extend((r.uri, r.roa) for r in ca_detail.roas.filter(roa__isnull = False)) + our_objs.extend((g.uri, g.ghostbuster) for g in ca_detail.ghostbusters.all()) + our_objs.extend((c.uri, c.cert) for c in ca_detail.ee_certificates.all()) + + q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + + for uri, obj in our_objs: + if uri not in pubd_objs: + SubElement(q_msg, rpki.publication.tag_publish, uri = uri).text = obj.get_Base64() + else: + h = pubd_objs.pop(uri) + if h != rpki.x509.sha256(obj.get_DER()).encode("hex"): + SubElement(q_msg, rpki.publication.tag_publish, uri = uri, hash = h).text = obj.get_Base64() + + for uri, h in pubd_objs.iteritems(): + SubElement(q_msg, rpki.publication.tag_withdraw, uri = uri, hash = h) + + yield repository.call_pubd(self.rpkid, q_msg) + @property def size(self): return sum(len(self.msgs[rid]) for rid in self.repositories) |