diff options
author | Rob Austein <sra@hactrn.net> | 2016-03-01 04:36:58 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-03-01 04:36:58 +0000 |
commit | 8ae577caff193627ea5d5433afff7643318c1f58 (patch) | |
tree | 8d58a37ca9f7721b65429ec89287db92035760d2 /rp | |
parent | ad8a1b6b8b91dcc92394a83ef9cd3a4ce66ef68a (diff) |
Somewhat cleaner version of bulk snapshot loader.
svn path=/branches/tk705/; revision=6305
Diffstat (limited to 'rp')
-rwxr-xr-x | rp/rcynic/rcynicng | 110 |
1 files changed, 38 insertions, 72 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 0ef7c25b..78987073 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -993,8 +993,28 @@ class Fetcher(object): raise tornado.gen.Return((retrieval, response, xml_file)) @tornado.gen.coroutine + def _rrdp_bulk_create(self, new_objs, existing_objs): + from django.db import IntegrityError + #logger.debug("Bulk creation of new RPKIObjects") + try: + RPKIObject.objects.bulk_create(new_objs) + except IntegrityError: + #logger.debug("Some objects already existed, weeding and retrying") + i = 0 + while i < len(new_objs): + yield tornado.gen.moment + try: + existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der)) + except RPKIObject.DoesNotExist: + i += 1 + else: + del new_objs[i] + RPKIObject.objects.bulk_create(new_objs) + del new_objs[:] + + @tornado.gen.coroutine def _rrdp_fetch(self): - from django.db import transaction, IntegrityError + from django.db import transaction if args.no_fetch: return @@ -1041,11 +1061,15 @@ class Fetcher(object): snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) + # Value of "chunk" here probably needs to be configurable. Larger numbers batch more objects in + # a single bulk addition, which is faster ... unless one or more of them isn't really new, in + # which case we have to check everything in that batch when we get the IntegrityError, so + # the smaller the batch, the faster that check. No single good answer. + root = None - count = 0 - bulk_old = [] - bulk_new = [] - chunk = 10000 + existing_rpkiobjects = [] + new_rpkiobjects = [] + chunk = 2000 # 10000 for event, node in iterparse(xml_file): if node is root: @@ -1068,92 +1092,34 @@ class Fetcher(object): or any(a != "uri" for a in node.attrib): raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) - count += 1 - if count % 1000 == 0: - logger.debug("Parsing %s object %s", url, count) - uri = node.get("uri") cls = uri_to_class(uri) if cls is None: raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) - # This next bit, sadly, is hideously slow for large snapshots. Django does have a bulk - # creation operation of sorts, but using it here would be trickly because the record may already - # exist. It's possible that we can do smoething clever here using the .exists() queryset - # operator to figure out whether we should add something to the bulk insertion. Don't really - # know that it would be faster, but it might, since read operations are probably faster than - # reads interleaved with writes. - # - # http://stackoverflow.com/questions/9475241/split-python-string-every-nth-character - # https://docs.djangoproject.com/en/1.9/ref/models/fields/#manytomanyfield - # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create - # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists - # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#get - # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#values-list - # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#when-querysets-are-evaluated - # https://docs.djangoproject.com/en/1.9/ref/models/relations/ - # https://docs.djangoproject.com/en/1.9/topics/db/models/#many-to-many-relationships - # https://docs.djangoproject.com/en/1.9/topics/db/queries/#additional-methods-to-handle-related-objects - # https://docs.djangoproject.com/en/1.9/topics/db/queries/#backwards-related-objects - # https://docs.djangoproject.com/en/1.9/topics/db/queries/#limiting-querysets - # - # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh. - # Well, maybe we can work around that, since "retrieval" already gives us the exact set of - # objects for which we'd need to patch up the many-to-many relationship pointer. Depends - # on what they mean by "doesn't work": automation not complete vs mysterious failure. - der = node.text.decode("base64") ski, aki = cls.derRead(der).get_hex_SKI_AKI() - bulk_new.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, - retrieved = retrieval, sha256 = sha256hex(der))) + new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, + retrieved = retrieval, sha256 = sha256hex(der))) node.clear() while node.getprevious() is not None: del root[0] - if len(bulk_new) > chunk: - logger.debug("Bulk creation of new RPKIObjects") - try: - RPKIObject.objects.bulk_create(bulk_new) - except IntegrityError: - logger.debug("Some objects already existed, weeding and retrying") - i = 0 - while i < len(bulk_new): - try: - bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der)) - except RPKIObject.DoesNotExist: - i += 1 - else: - del bulk_new[i] - RPKIObject.objects.bulk_create(bulk_new) - del bulk_new[:] + if len(new_rpkiobjects) > chunk: + yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) yield tornado.gen.moment - if len(bulk_new) > 0: - logger.debug("Bulk creation of new RPKIObjects") - try: - RPKIObject.objects.bulk_create(bulk_new) - except IntegrityError: - logger.debug("Some objects already existed, weeding and retrying") - i = 0 - while i < len(bulk_new): - try: - bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der)) - except RPKIObject.DoesNotExist: - i += 1 - else: - del bulk_new[i] - RPKIObject.objects.bulk_create(bulk_new) - - n = len(bulk_old) + if len(new_rpkiobjects) > 0: + yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) + + n = len(existing_rpkiobjects) for i in xrange(0, n, chunk): - logger.debug("Bulk addition of existing RPKIObjects to RRDPSnapshot") - snapshot.rpkiobject_set.add(*bulk_old[i : i + chunk]) + snapshot.rpkiobject_set.add(*existing_rpkiobjects[i : i + chunk]) n = retrieval.rpkiobject_set.count() for i in xrange(0, n, chunk): - logger.debug("Bulk addition of new RPKIObjects to RRDPSnapshot") snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk])) snapshot.retrieved = retrieval |