diff options
author | Rob Austein <sra@hactrn.net> | 2016-03-01 03:32:52 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-03-01 03:32:52 +0000 |
commit | f0f6dd260f945e3194a47d3581443c39c828e08e (patch) | |
tree | 49f7d790f1238cade33a8a18a66b75dd9b3408a1 /rp | |
parent | 633a222f72c2818ade32bc8d06c069fc675c88f3 (diff) |
Much faster loading of large RRDP snapshots. Code is messy and needs
cleanup, but runs more than an order of magnitude faster in common
case where the vast majority of objects in the snapshot are new, and
(in theory) should not be significantly slower than what we were doing
already in more complicated mixtures.
svn path=/branches/tk705/; revision=6303
Diffstat (limited to 'rp')
-rwxr-xr-x | rp/rcynic/rcynicng | 174 |
1 files changed, 96 insertions, 78 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 4dc0c5f9..0ef7c25b 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -135,27 +135,25 @@ class X509StoreCTX(rpki.POW.X509StoreCTX): return ok -class X509(rpki.POW.X509): - - def __repr__(self): - try: - return "<X509 \"{}\" at 0x{:x}>".format(self.uri, id(self)) - except: - return "<X509 at 0x{:x}>".format(id(self)) +class POW_Mixin(object): @classmethod def store_if_new(cls, der, uri, retrieval): self = cls.derRead(der) - aki = self.getAKI() - ski = self.getSKI() + ski, aki = self.get_hex_SKI_AKI() return RPKIObject.objects.get_or_create( - der = der, - defaults = dict( - uri = uri, - aki = "" if aki is None else aki.encode("hex"), - ski = "" if ski is None else ski.encode("hex"), - sha256 = sha256hex(der), - retrieved = retrieval)) + der = der, + defaults = dict(uri = uri, + aki = aki, + ski = ski, + sha256 = sha256hex(der), + retrieved = retrieval)) + + def get_hex_SKI_AKI(self): + cer = self.certs()[0] + ski = cer.getSKI() + aki = cer.getAKI() + return ski.encode("hex") if ski else "", aki.encode("hex") if aki else "" @property def uri(self): @@ -169,6 +167,20 @@ class X509(rpki.POW.X509): def ski(self): return self.obj.ski + +class X509(rpki.POW.X509, POW_Mixin): + + def __repr__(self): + try: + return "<X509 \"{}\" at 0x{:x}>".format(self.uri, id(self)) + except: + return "<X509 at 0x{:x}>".format(id(self)) + + def get_hex_SKI_AKI(self): + ski = self.getSKI() + aki = self.getAKI() + return ski.encode("hex") if ski else "", aki.encode("hex") if aki else "" + @classmethod def load(cls, obj, cms = None): if cms is not None: @@ -258,7 +270,7 @@ class X509(rpki.POW.X509): return not any(s.kind == "bad" for s in status) -class CRL(rpki.POW.CRL): +class CRL(rpki.POW.CRL, POW_Mixin): def __repr__(self): try: @@ -266,30 +278,9 @@ class CRL(rpki.POW.CRL): except: return "<CRL at 0x{:x}>".format(id(self)) - @classmethod - def store_if_new(cls, der, uri, retrieval): - self = cls.derRead(der) - aki = self.getAKI() - return RPKIObject.objects.get_or_create( - der = der, - defaults = dict( - uri = uri, - aki = "" if aki is None else aki.encode("hex"), - ski = "", - sha256 = sha256hex(der), - retrieved = retrieval)) - - @property - def uri(self): - return self.obj.uri - - @property - def aki(self): - return self.obj.aki - - @property - def ski(self): - return "" + def get_hex_SKI_AKI(self): + aki = self.getAKI() + return "", aki.encode("hex") if aki else "" @classmethod def load(cls, obj): @@ -332,37 +323,7 @@ class CRL(rpki.POW.CRL): return not any(s.kind == "bad" for s in status) -class CMS_Mixin(object): - - @classmethod - def store_if_new(cls, der, uri, retrieval): - self = cls.derRead(der) - cert = self.certs()[0] - aki = cert.getAKI() - ski = cert.getSKI() - return RPKIObject.objects.get_or_create( - der = der, - defaults = dict( - uri = uri, - aki = "" if aki is None else aki.encode("hex"), - ski = "" if ski is None else ski.encode("hex"), - sha256 = sha256hex(der), - retrieved = retrieval)) - - @property - def uri(self): - return self.obj.uri - - @property - def aki(self): - return self.obj.aki - - @property - def ski(self): - return self.obj.ski - - -class Ghostbuster(rpki.POW.CMS, CMS_Mixin): +class Ghostbuster(rpki.POW.CMS, POW_Mixin): def __repr__(self): try: @@ -391,7 +352,7 @@ class Ghostbuster(rpki.POW.CMS, CMS_Mixin): return not any(s.kind == "bad" for s in status) -class Manifest(rpki.POW.Manifest, CMS_Mixin): +class Manifest(rpki.POW.Manifest, POW_Mixin): def __repr__(self): try: @@ -441,7 +402,7 @@ class Manifest(rpki.POW.Manifest, CMS_Mixin): yield digest.encode("hex") -class ROA(rpki.POW.ROA, CMS_Mixin): +class ROA(rpki.POW.ROA, POW_Mixin): def __repr__(self): try: @@ -1033,7 +994,7 @@ class Fetcher(object): @tornado.gen.coroutine def _rrdp_fetch(self): - from django.db import transaction + from django.db import transaction, IntegrityError if args.no_fetch: return @@ -1082,6 +1043,9 @@ class Fetcher(object): root = None count = 0 + bulk_old = [] + bulk_new = [] + chunk = 10000 for event, node in iterparse(xml_file): if node is root: @@ -1120,24 +1084,78 @@ class Fetcher(object): # know that it would be faster, but it might, since read operations are probably faster than # reads interleaved with writes. # - # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists + # http://stackoverflow.com/questions/9475241/split-python-string-every-nth-character + # https://docs.djangoproject.com/en/1.9/ref/models/fields/#manytomanyfield # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#get + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#values-list + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#when-querysets-are-evaluated # https://docs.djangoproject.com/en/1.9/ref/models/relations/ + # https://docs.djangoproject.com/en/1.9/topics/db/models/#many-to-many-relationships + # https://docs.djangoproject.com/en/1.9/topics/db/queries/#additional-methods-to-handle-related-objects + # https://docs.djangoproject.com/en/1.9/topics/db/queries/#backwards-related-objects + # https://docs.djangoproject.com/en/1.9/topics/db/queries/#limiting-querysets # # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh. # Well, maybe we can work around that, since "retrieval" already gives us the exact set of # objects for which we'd need to patch up the many-to-many relationship pointer. Depends # on what they mean by "doesn't work": automation not complete vs mysterious failure. - obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) - obj.snapshot.add(snapshot) + der = node.text.decode("base64") + ski, aki = cls.derRead(der).get_hex_SKI_AKI() + bulk_new.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, + retrieved = retrieval, sha256 = sha256hex(der))) node.clear() while node.getprevious() is not None: del root[0] + if len(bulk_new) > chunk: + logger.debug("Bulk creation of new RPKIObjects") + try: + RPKIObject.objects.bulk_create(bulk_new) + except IntegrityError: + logger.debug("Some objects already existed, weeding and retrying") + i = 0 + while i < len(bulk_new): + try: + bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der)) + except RPKIObject.DoesNotExist: + i += 1 + else: + del bulk_new[i] + RPKIObject.objects.bulk_create(bulk_new) + del bulk_new[:] + yield tornado.gen.moment + if len(bulk_new) > 0: + logger.debug("Bulk creation of new RPKIObjects") + try: + RPKIObject.objects.bulk_create(bulk_new) + except IntegrityError: + logger.debug("Some objects already existed, weeding and retrying") + i = 0 + while i < len(bulk_new): + try: + bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der)) + except RPKIObject.DoesNotExist: + i += 1 + else: + del bulk_new[i] + RPKIObject.objects.bulk_create(bulk_new) + + n = len(bulk_old) + for i in xrange(0, n, chunk): + logger.debug("Bulk addition of existing RPKIObjects to RRDPSnapshot") + snapshot.rpkiobject_set.add(*bulk_old[i : i + chunk]) + + n = retrieval.rpkiobject_set.count() + for i in xrange(0, n, chunk): + logger.debug("Bulk addition of new RPKIObjects to RRDPSnapshot") + snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk])) + snapshot.retrieved = retrieval snapshot.save() |