aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r--rpki/rpkid.py76
1 files changed, 67 insertions, 9 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index a82195b3..a518797c 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -137,7 +137,7 @@ class main(object):
self.http_server_host = self.cfg.get("server-host", "")
self.http_server_port = self.cfg.getint("server-port")
- self.http_client_timeout = self.cfg.getint("http-client-timeout", 300)
+ self.http_client_timeout = self.cfg.getint("http-client-timeout", 900)
self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
@@ -337,9 +337,19 @@ class main(object):
http_client = tornado.httpclient.AsyncHTTPClient()
with (yield lock.acquire()):
- response = yield http_client.fetch(request,
- connect_timeout = self.http_client_timeout,
- request_timeout = self.http_client_timeout)
+ try:
+ started = time.time()
+ response = yield http_client.fetch(request)
+ except tornado.httpclient.HTTPError as e:
+
+ # XXX This is not a solution, just an attempt to
+ # gather data on whether the timeout arguments are
+ # working as expected.
+
+ logger.warning("%r: HTTP error contacting %r: %s", self, request, e)
+ if e.code == 599:
+ logger.warning("%r: HTTP timeout after time %s seconds", self, time.time() - started)
+ raise
raise tornado.gen.Return(response)
@@ -363,10 +373,12 @@ class main(object):
q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
http_request = tornado.httpclient.HTTPRequest(
- url = self.irdb_url,
- method = "POST",
- body = q_der,
- headers = { "Content-Type" : rpki.left_right.content_type })
+ url = self.irdb_url,
+ method = "POST",
+ body = q_der,
+ headers = { "Content-Type" : rpki.left_right.content_type },
+ connect_timeout = self.http_client_timeout,
+ request_timeout = self.http_client_timeout)
http_response = yield self.http_fetch(http_request)
@@ -743,12 +755,58 @@ class publication_queue(object):
def call_pubd(self):
for rid in self.repositories:
logger.debug("Calling pubd[%r]", self.repositories[rid])
- yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers)
+ try:
+ yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers)
+ except (rpki.exceptions.ExistingObjectAtURI, rpki.exceptions.DifferentObjectAtURI, rpki.exceptions.NoObjectAtURI) as e:
+ logger.warn("Lost synchronization with %r: %s", self.repositories[rid], e)
+ yield self.resync(self.repositories[rid])
for k in self.uris.iterkeys():
if self._inplay.get(k) is self:
del self._inplay[k]
self.clear()
+ @tornado.gen.coroutine
+ def resync(self, repository):
+ logger.info("Attempting resynchronization with %r", repository)
+
+ # A lot of this is copy and paste from .serve_publish_world_now().
+ # Refactor when we have more of a clue about how this should work.
+
+ q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
+ type = "query", version = rpki.publication.version)
+ SubElement(q_msg, rpki.publication.tag_list, tag = "list")
+ r_msg = yield repository.call_pubd(self.rpkid, q_msg, length_check = False)
+
+ if not all(r_pdu.tag == rpki.publication.tag_list for r_pdu in r_msg):
+ raise rpki.exceptions.BadPublicationReply("Unexpected XML tag in publication response")
+
+ pubd_objs = dict((r_pdu.get("uri"), r_pdu.get("hash")) for r_pdu in r_msg)
+
+ our_objs = []
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = repository.tenant, state = "active"):
+ our_objs = [(ca_detail.crl_uri, ca_detail.latest_crl),
+ (ca_detail.manifest_uri, ca_detail.latest_manifest)]
+ our_objs.extend((c.uri, c.cert) for c in ca_detail.child_certs.all())
+ our_objs.extend((r.uri, r.roa) for r in ca_detail.roas.filter(roa__isnull = False))
+ our_objs.extend((g.uri, g.ghostbuster) for g in ca_detail.ghostbusters.all())
+ our_objs.extend((c.uri, c.cert) for c in ca_detail.ee_certificates.all())
+
+ q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
+ type = "query", version = rpki.publication.version)
+
+ for uri, obj in our_objs:
+ if uri not in pubd_objs:
+ SubElement(q_msg, rpki.publication.tag_publish, uri = uri).text = obj.get_Base64()
+ else:
+ h = pubd_objs.pop(uri)
+ if h != rpki.x509.sha256(obj.get_DER()).encode("hex"):
+ SubElement(q_msg, rpki.publication.tag_publish, uri = uri, hash = h).text = obj.get_Base64()
+
+ for uri, h in pubd_objs.iteritems():
+ SubElement(q_msg, rpki.publication.tag_withdraw, uri = uri, hash = h)
+
+ yield repository.call_pubd(self.rpkid, q_msg)
+
@property
def size(self):
return sum(len(self.msgs[rid]) for rid in self.repositories)