aboutsummaryrefslogtreecommitdiff
path: root/rp/rcynic/rcynicng
diff options
context:
space:
mode:
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-xrp/rcynic/rcynicng140
1 files changed, 97 insertions, 43 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 3e32e286..cfcfd133 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -21,6 +21,7 @@ import tornado.locks
import tornado.ioloop
import tornado.queues
import tornado.process
+import tornado.httpclient
import rpki.POW
import rpki.sundial
@@ -135,7 +136,7 @@ class X509(rpki.POW.X509):
self = cls.derRead(der)
aki = self.getAKI()
ski = self.getSKI()
- return rpki.rcynicdb.models.RPKIObject.objects.get_or_create(
+ return RPKIObject.objects.get_or_create(
der = der,
defaults = dict(
uri = uri,
@@ -257,7 +258,7 @@ class CRL(rpki.POW.CRL):
def store_if_new(cls, der, uri, retrieval):
self = cls.derRead(der)
aki = self.getAKI()
- return rpki.rcynicdb.models.RPKIObject.objects.get_or_create(
+ return RPKIObject.objects.get_or_create(
der = der,
defaults = dict(
uri = uri,
@@ -327,7 +328,7 @@ class CMS_Mixin(object):
cert = self.certs()[0]
aki = cert.getAKI()
ski = cert.getSKI()
- return rpki.rcynicdb.models.RPKIObject.objects.get_or_create(
+ return RPKIObject.objects.get_or_create(
der = der,
defaults = dict(
uri = uri,
@@ -484,7 +485,7 @@ def uri_to_class(uri):
# https://docs.djangoproject.com/en/1.8/ref/models/options/#django.db.models.Options.ordering
def fetch_objects(**kwargs):
- for obj in rpki.rcynicdb.models.RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"):
+ for obj in RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"):
cls = uri_to_class(obj.uri)
if cls is not None:
yield cls.load(obj)
@@ -516,10 +517,15 @@ class WalkFrame(object):
@tornado.gen.coroutine
def initial(self, wsk):
- # Will need some logic to decide whether to prefer RRDP or rsync.
- # For the moment, we've only implemented rsync, so this is easy.
+ rsync_uri = first_rsync_uri(self.cer.caDirectory)
+ rrdp_uri = first_https_uri(self.cer.rpkiNotify)
- self.fetcher = Fetcher(first_rsync_uri(self.cer.caDirectory))
+ if args.prefer_rsync:
+ uri = rsync_uri or rrdp_uri
+ else:
+ uri = rrdp_uri or rsync_uri
+
+ self.fetcher = Fetcher(uri)
if not self.fetcher.needed():
self.state = self.ready
@@ -720,6 +726,9 @@ def first_uri(uris, scheme):
def first_rsync_uri(uris):
return first_uri(uris, "rsync://")
+def first_https_uri(uris):
+ return first_uri(uris, "https://")
+
def sha256(bytes):
d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
d.update(bytes)
@@ -864,7 +873,7 @@ class Fetcher(object):
# Should do something with rsync result and validation status database here.
- retrieval = rpki.rcynicdb.models.Retrieval.objects.create(
+ retrieval = Retrieval.objects.create(
uri = self.uri,
started = rpki.sundial.datetime.fromtimestamp(t0),
finished = rpki.sundial.datetime.fromtimestamp(t1),
@@ -901,6 +910,15 @@ class Fetcher(object):
if urlparse.urlparse(url).netloc in self._https_deadhosts:
raise DeadHost
+ # Should do something with deadhost processing below. Looks
+ # like errors such as HTTP timeout show up as
+ # tornado.httpclient.HTTPError exceptions (which we could
+ # suppress if we wanted to do so, but we probably don't).
+ # HTTP timeout shows up in the logs as "HTTP 599". See doc for:
+ #
+ # tornado.httpclient.AsyncHTTPClient.fetch()
+ # tornado.httpclient.HTTPError
+
try:
t0 = time.time()
client = tornado.httpclient.AsyncHTTPClient()
@@ -916,13 +934,13 @@ class Fetcher(object):
finally:
t1 = time.time()
logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0)
- retrieval = rpki.rcynicdb.models.Retrieval.objects.create(
+ retrieval = Retrieval.objects.create(
uri = url,
started = rpki.sundial.datetime.fromtimestamp(t0),
finished = rpki.sundial.datetime.fromtimestamp(t1),
successful = ok)
- raise tornado.gen.Return(retrieval, response.body if ok else None)
+ raise tornado.gen.Return((retrieval, response.body if ok else None))
@tornado.gen.coroutine
def _https_fetch(self):
@@ -950,7 +968,7 @@ class Fetcher(object):
pending.notify_all()
@tornado.gen.coroutine
- def _rrdp_fetch_url(self, url, tag, hash = None, uuid = None, serial = None):
+ def _rrdp_fetch_url(self, url, tag, hash = None, session_id = None, serial = None):
retrieval, xml = yield self._https_fetch_url(url)
@@ -965,15 +983,15 @@ class Fetcher(object):
raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % (
tag, delta_url, xml.tag))
- if uuid is not None and xml.get("uuid") != uuid:
- raise RRDP_ParseFailure("Expected RRDP UUID %s for %s, got %s" % (
- uuid, url, xml.get("uuid")))
+ if session_id is not None and xml.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id %s for %s, got %s" % (
+ session_id, url, xml.get("session_id")))
if serial is not None and long(xml.get("serial")) != long(serial):
raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % (
serial, url, xml.get("serial")))
- raise tornado.gen.Return(retrieval, xml)
+ raise tornado.gen.Return((retrieval, xml))
@tornado.gen.coroutine
def _rrdp_fetch(self):
@@ -988,32 +1006,37 @@ class Fetcher(object):
return
self.pending = tornado.locks.Condition()
- self._rsync_history[self.uri] = self
+ self._https_history[self.uri] = self
try:
retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification")
- uuid = notification.get("uuid")
+ session_id = notification.get("session_id")
serial = long(notification.get("serial"))
- old_snapshot = rpki.rcynicdb.models.RRDPSnapshot.objects.filter(
- uuid = uuid).order_by("-retrieved__started").first()
+ old_snapshot = RRDPSnapshot.objects.filter(
+ session_id = session_id).order_by("-retrieved__started").first()
- if old_snapshot is not None and old_snapshot.serial == serial:
- return # We're up to date, nothing to do
+ logger.debug("RRDP notification for %s session_id %s serial %s old_snapshot %r",
+ self.uri, session_id, serial, old_snapshot)
+ if old_snapshot is not None and old_snapshot.serial == serial:
+ logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri)
+ return
+
delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
- new_snapshot = rpki.rcynicdb.models.RRDPSnapshot(retrieved = retrieval, uuid = uuid, serial = serial)
+ new_snapshot = RRDPSnapshot(retrieved = retrieval, session_id = session_id, serial = serial)
if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials:
- # No useful combination of old snapshot and available deltas, need to load current snapshot.
x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot")
+ logger.debug("RRDP %s loading from snapshot %s", self.uri, x.get("uri"))
+
retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"),
- tag = "snapshot", uuid = uuid, serial = serial)
+ tag = "snapshot", session_id = session_id, serial = serial)
with transaction.atomic():
new_snapshot.save()
@@ -1027,17 +1050,17 @@ class Fetcher(object):
obj.save()
else:
- # In theory there exists a set of deltas which will get us from old_snapshot to new_snapshot.
+ logger.debug("RRDP %s loading %s deltas", self.uri, (new_snapshot.serial - old_snapshot.serial))
- deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
- hash = delta_serials[serial][1],
- tag = "delta",
- uuid = uuid,
- serial = serial))
+ deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
+ hash = delta_serials[serial][1],
+ tag = "delta",
+ session_id = session_id,
+ serial = serial))
for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)]
with transaction.atomic():
- new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set
+ new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set.all()
new_snapshot.save()
for retrieval, delta in deltas:
@@ -1145,33 +1168,54 @@ def final_report():
def final_cleanup():
from django.db import transaction
+ def report(when):
+ logger.debug("Database state %s cleanup: %s Authenticated %s RRDPSnapshot %s RPKIObject %s Retrieval",
+ when,
+ Authenticated.objects.all().count(),
+ RRDPSnapshot.objects.all().count(),
+ RPKIObject.objects.all().count(),
+ Retrieval.objects.all().count())
+
+ report("before")
+
with transaction.atomic():
- # Flush old authenticated sets.
+ #logger.debug("Flushing old authenticated sets")
- q = rpki.rcynicdb.models.Authenticated.objects.exclude(id = authenticated.id)
+ q = Authenticated.objects
+ q = q.exclude(id = authenticated.id)
q.delete()
- # Flush RRDP snapshots which don't contribute anything to the current authenticated set.
+ #logger.debug("Flushing RRDP snapshots which don't contain anything in the (remaining) authenticated set")
- q = authenticated.rpkiobject_set.order_by("snapshot__id").values_list("snapshot__id", flat = True)
- q = rpki.rcynicdb.models.RRDPSnapshot.objects.exclude(id__in = q.distinct())
+ q = RPKIObject.objects
+ q = q.filter(authenticated = authenticated.id)
+ q = q.exclude(snapshot = None)
+ q = q.order_by("snapshot__id")
+ q = q.values_list("snapshot__id", flat = True)
+ q = q.distinct()
+ q = RRDPSnapshot.objects.exclude(id__in = q)
q.delete()
- # Flush RPKI objects which are neither part of the current authenticated set nor part of
- # a current RRDP snapshot.
+ #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot")
- q = rpki.rcynicdb.models.RPKIObject.objects
+ q = RPKIObject.objects
q = q.exclude(authenticated = authenticated.id)
q = q.filter(snapshot = None)
q.delete()
- # Flush retrieval objects which are no longer related to any RPKI objects.
+ #logger.debug("Flushing retrieval objects which are no longer related to any RPKI objects or RRDP snapshot")
- q = rpki.rcynicdb.models.RPKIObject.objects.order_by("retrieved__id").values_list("retrieved__id", flat = True)
- q = rpki.rcynicdb.models.Retrieval.objects.exclude(id__in = q.distinct())
+ q = RPKIObject.objects
+ q = q.order_by("retrieved__id")
+ q = q.values_list("retrieved__id", flat = True)
+ q = q.distinct()
+ q = Retrieval.objects.exclude(id__in = q)
+ q = q.filter(rrdpsnapshot = None)
q.delete()
+ report("after")
+
@tornado.gen.coroutine
def launcher():
@@ -1211,6 +1255,8 @@ def main():
parser.add_argument("--no-spawn-on-fetch", action = "store_true")
parser.add_argument("--no-migrate", action = "store_true")
+ parser.add_argument("--prefer-rsync", action = "store_true")
+
global args
args = parser.parse_args()
@@ -1231,11 +1277,19 @@ def main():
django.core.management.call_command("migrate", verbosity = 0, interactive = False)
import rpki.rcynicdb
+ global Retrieval
+ global Authenticated
+ global RRDPSnapshot
+ global RPKIObject
+ Retrieval = rpki.rcynicdb.models.Retrieval
+ Authenticated = rpki.rcynicdb.models.Authenticated
+ RRDPSnapshot = rpki.rcynicdb.models.RRDPSnapshot
+ RPKIObject = rpki.rcynicdb.models.RPKIObject
logging.basicConfig(level = logging.DEBUG, format = "%(asctime)s %(message)s", datefmt = "%Y-%m-%d %H:%M:%S")
global authenticated
- authenticated = rpki.rcynicdb.models.Authenticated.objects.create(started = rpki.sundial.datetime.now())
+ authenticated = Authenticated.objects.create(started = rpki.sundial.datetime.now())
global task_queue
task_queue = tornado.queues.Queue()