aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-03-01 03:32:52 +0000
committerRob Austein <sra@hactrn.net>2016-03-01 03:32:52 +0000
commitf0f6dd260f945e3194a47d3581443c39c828e08e (patch)
tree49f7d790f1238cade33a8a18a66b75dd9b3408a1
parent633a222f72c2818ade32bc8d06c069fc675c88f3 (diff)
Much faster loading of large RRDP snapshots. Code is messy and needs
cleanup, but runs more than an order of magnitude faster in common case where the vast majority of objects in the snapshot are new, and (in theory) should not be significantly slower than what we were doing already in more complicated mixtures. svn path=/branches/tk705/; revision=6303
-rwxr-xr-xrp/rcynic/rcynicng174
1 files changed, 96 insertions, 78 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 4dc0c5f9..0ef7c25b 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -135,27 +135,25 @@ class X509StoreCTX(rpki.POW.X509StoreCTX):
return ok
-class X509(rpki.POW.X509):
-
- def __repr__(self):
- try:
- return "<X509 \"{}\" at 0x{:x}>".format(self.uri, id(self))
- except:
- return "<X509 at 0x{:x}>".format(id(self))
+class POW_Mixin(object):
@classmethod
def store_if_new(cls, der, uri, retrieval):
self = cls.derRead(der)
- aki = self.getAKI()
- ski = self.getSKI()
+ ski, aki = self.get_hex_SKI_AKI()
return RPKIObject.objects.get_or_create(
- der = der,
- defaults = dict(
- uri = uri,
- aki = "" if aki is None else aki.encode("hex"),
- ski = "" if ski is None else ski.encode("hex"),
- sha256 = sha256hex(der),
- retrieved = retrieval))
+ der = der,
+ defaults = dict(uri = uri,
+ aki = aki,
+ ski = ski,
+ sha256 = sha256hex(der),
+ retrieved = retrieval))
+
+ def get_hex_SKI_AKI(self):
+ cer = self.certs()[0]
+ ski = cer.getSKI()
+ aki = cer.getAKI()
+ return ski.encode("hex") if ski else "", aki.encode("hex") if aki else ""
@property
def uri(self):
@@ -169,6 +167,20 @@ class X509(rpki.POW.X509):
def ski(self):
return self.obj.ski
+
+class X509(rpki.POW.X509, POW_Mixin):
+
+ def __repr__(self):
+ try:
+ return "<X509 \"{}\" at 0x{:x}>".format(self.uri, id(self))
+ except:
+ return "<X509 at 0x{:x}>".format(id(self))
+
+ def get_hex_SKI_AKI(self):
+ ski = self.getSKI()
+ aki = self.getAKI()
+ return ski.encode("hex") if ski else "", aki.encode("hex") if aki else ""
+
@classmethod
def load(cls, obj, cms = None):
if cms is not None:
@@ -258,7 +270,7 @@ class X509(rpki.POW.X509):
return not any(s.kind == "bad" for s in status)
-class CRL(rpki.POW.CRL):
+class CRL(rpki.POW.CRL, POW_Mixin):
def __repr__(self):
try:
@@ -266,30 +278,9 @@ class CRL(rpki.POW.CRL):
except:
return "<CRL at 0x{:x}>".format(id(self))
- @classmethod
- def store_if_new(cls, der, uri, retrieval):
- self = cls.derRead(der)
- aki = self.getAKI()
- return RPKIObject.objects.get_or_create(
- der = der,
- defaults = dict(
- uri = uri,
- aki = "" if aki is None else aki.encode("hex"),
- ski = "",
- sha256 = sha256hex(der),
- retrieved = retrieval))
-
- @property
- def uri(self):
- return self.obj.uri
-
- @property
- def aki(self):
- return self.obj.aki
-
- @property
- def ski(self):
- return ""
+ def get_hex_SKI_AKI(self):
+ aki = self.getAKI()
+ return "", aki.encode("hex") if aki else ""
@classmethod
def load(cls, obj):
@@ -332,37 +323,7 @@ class CRL(rpki.POW.CRL):
return not any(s.kind == "bad" for s in status)
-class CMS_Mixin(object):
-
- @classmethod
- def store_if_new(cls, der, uri, retrieval):
- self = cls.derRead(der)
- cert = self.certs()[0]
- aki = cert.getAKI()
- ski = cert.getSKI()
- return RPKIObject.objects.get_or_create(
- der = der,
- defaults = dict(
- uri = uri,
- aki = "" if aki is None else aki.encode("hex"),
- ski = "" if ski is None else ski.encode("hex"),
- sha256 = sha256hex(der),
- retrieved = retrieval))
-
- @property
- def uri(self):
- return self.obj.uri
-
- @property
- def aki(self):
- return self.obj.aki
-
- @property
- def ski(self):
- return self.obj.ski
-
-
-class Ghostbuster(rpki.POW.CMS, CMS_Mixin):
+class Ghostbuster(rpki.POW.CMS, POW_Mixin):
def __repr__(self):
try:
@@ -391,7 +352,7 @@ class Ghostbuster(rpki.POW.CMS, CMS_Mixin):
return not any(s.kind == "bad" for s in status)
-class Manifest(rpki.POW.Manifest, CMS_Mixin):
+class Manifest(rpki.POW.Manifest, POW_Mixin):
def __repr__(self):
try:
@@ -441,7 +402,7 @@ class Manifest(rpki.POW.Manifest, CMS_Mixin):
yield digest.encode("hex")
-class ROA(rpki.POW.ROA, CMS_Mixin):
+class ROA(rpki.POW.ROA, POW_Mixin):
def __repr__(self):
try:
@@ -1033,7 +994,7 @@ class Fetcher(object):
@tornado.gen.coroutine
def _rrdp_fetch(self):
- from django.db import transaction
+ from django.db import transaction, IntegrityError
if args.no_fetch:
return
@@ -1082,6 +1043,9 @@ class Fetcher(object):
root = None
count = 0
+ bulk_old = []
+ bulk_new = []
+ chunk = 10000
for event, node in iterparse(xml_file):
if node is root:
@@ -1120,24 +1084,78 @@ class Fetcher(object):
# know that it would be faster, but it might, since read operations are probably faster than
# reads interleaved with writes.
#
- # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists
+ # http://stackoverflow.com/questions/9475241/split-python-string-every-nth-character
+ # https://docs.djangoproject.com/en/1.9/ref/models/fields/#manytomanyfield
# https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create
+ # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists
+ # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#get
+ # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#values-list
+ # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#when-querysets-are-evaluated
# https://docs.djangoproject.com/en/1.9/ref/models/relations/
+ # https://docs.djangoproject.com/en/1.9/topics/db/models/#many-to-many-relationships
+ # https://docs.djangoproject.com/en/1.9/topics/db/queries/#additional-methods-to-handle-related-objects
+ # https://docs.djangoproject.com/en/1.9/topics/db/queries/#backwards-related-objects
+ # https://docs.djangoproject.com/en/1.9/topics/db/queries/#limiting-querysets
#
# ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh.
# Well, maybe we can work around that, since "retrieval" already gives us the exact set of
# objects for which we'd need to patch up the many-to-many relationship pointer. Depends
# on what they mean by "doesn't work": automation not complete vs mysterious failure.
- obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval)
- obj.snapshot.add(snapshot)
+ der = node.text.decode("base64")
+ ski, aki = cls.derRead(der).get_hex_SKI_AKI()
+ bulk_new.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki,
+ retrieved = retrieval, sha256 = sha256hex(der)))
node.clear()
while node.getprevious() is not None:
del root[0]
+ if len(bulk_new) > chunk:
+ logger.debug("Bulk creation of new RPKIObjects")
+ try:
+ RPKIObject.objects.bulk_create(bulk_new)
+ except IntegrityError:
+ logger.debug("Some objects already existed, weeding and retrying")
+ i = 0
+ while i < len(bulk_new):
+ try:
+ bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der))
+ except RPKIObject.DoesNotExist:
+ i += 1
+ else:
+ del bulk_new[i]
+ RPKIObject.objects.bulk_create(bulk_new)
+ del bulk_new[:]
+
yield tornado.gen.moment
+ if len(bulk_new) > 0:
+ logger.debug("Bulk creation of new RPKIObjects")
+ try:
+ RPKIObject.objects.bulk_create(bulk_new)
+ except IntegrityError:
+ logger.debug("Some objects already existed, weeding and retrying")
+ i = 0
+ while i < len(bulk_new):
+ try:
+ bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der))
+ except RPKIObject.DoesNotExist:
+ i += 1
+ else:
+ del bulk_new[i]
+ RPKIObject.objects.bulk_create(bulk_new)
+
+ n = len(bulk_old)
+ for i in xrange(0, n, chunk):
+ logger.debug("Bulk addition of existing RPKIObjects to RRDPSnapshot")
+ snapshot.rpkiobject_set.add(*bulk_old[i : i + chunk])
+
+ n = retrieval.rpkiobject_set.count()
+ for i in xrange(0, n, chunk):
+ logger.debug("Bulk addition of new RPKIObjects to RRDPSnapshot")
+ snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk]))
+
snapshot.retrieved = retrieval
snapshot.save()