aboutsummaryrefslogtreecommitdiff
path: root/rp/rcynic/rcynicng
diff options
context:
space:
mode:
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-xrp/rcynic/rcynicng110
1 files changed, 38 insertions, 72 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 0ef7c25b..78987073 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -993,8 +993,28 @@ class Fetcher(object):
raise tornado.gen.Return((retrieval, response, xml_file))
@tornado.gen.coroutine
+ def _rrdp_bulk_create(self, new_objs, existing_objs):
+ from django.db import IntegrityError
+ #logger.debug("Bulk creation of new RPKIObjects")
+ try:
+ RPKIObject.objects.bulk_create(new_objs)
+ except IntegrityError:
+ #logger.debug("Some objects already existed, weeding and retrying")
+ i = 0
+ while i < len(new_objs):
+ yield tornado.gen.moment
+ try:
+ existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der))
+ except RPKIObject.DoesNotExist:
+ i += 1
+ else:
+ del new_objs[i]
+ RPKIObject.objects.bulk_create(new_objs)
+ del new_objs[:]
+
+ @tornado.gen.coroutine
def _rrdp_fetch(self):
- from django.db import transaction, IntegrityError
+ from django.db import transaction
if args.no_fetch:
return
@@ -1041,11 +1061,15 @@ class Fetcher(object):
snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial)
+ # Value of "chunk" here probably needs to be configurable. Larger numbers batch more objects in
+ # a single bulk addition, which is faster ... unless one or more of them isn't really new, in
+ # which case we have to check everything in that batch when we get the IntegrityError, so
+ # the smaller the batch, the faster that check. No single good answer.
+
root = None
- count = 0
- bulk_old = []
- bulk_new = []
- chunk = 10000
+ existing_rpkiobjects = []
+ new_rpkiobjects = []
+ chunk = 2000 # 10000
for event, node in iterparse(xml_file):
if node is root:
@@ -1068,92 +1092,34 @@ class Fetcher(object):
or any(a != "uri" for a in node.attrib):
raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url))
- count += 1
- if count % 1000 == 0:
- logger.debug("Parsing %s object %s", url, count)
-
uri = node.get("uri")
cls = uri_to_class(uri)
if cls is None:
raise RRDP_ParseFailure("Unexpected URI {}".format(uri))
- # This next bit, sadly, is hideously slow for large snapshots. Django does have a bulk
- # creation operation of sorts, but using it here would be trickly because the record may already
- # exist. It's possible that we can do smoething clever here using the .exists() queryset
- # operator to figure out whether we should add something to the bulk insertion. Don't really
- # know that it would be faster, but it might, since read operations are probably faster than
- # reads interleaved with writes.
- #
- # http://stackoverflow.com/questions/9475241/split-python-string-every-nth-character
- # https://docs.djangoproject.com/en/1.9/ref/models/fields/#manytomanyfield
- # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create
- # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists
- # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#get
- # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#values-list
- # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#when-querysets-are-evaluated
- # https://docs.djangoproject.com/en/1.9/ref/models/relations/
- # https://docs.djangoproject.com/en/1.9/topics/db/models/#many-to-many-relationships
- # https://docs.djangoproject.com/en/1.9/topics/db/queries/#additional-methods-to-handle-related-objects
- # https://docs.djangoproject.com/en/1.9/topics/db/queries/#backwards-related-objects
- # https://docs.djangoproject.com/en/1.9/topics/db/queries/#limiting-querysets
- #
- # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh.
- # Well, maybe we can work around that, since "retrieval" already gives us the exact set of
- # objects for which we'd need to patch up the many-to-many relationship pointer. Depends
- # on what they mean by "doesn't work": automation not complete vs mysterious failure.
-
der = node.text.decode("base64")
ski, aki = cls.derRead(der).get_hex_SKI_AKI()
- bulk_new.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki,
- retrieved = retrieval, sha256 = sha256hex(der)))
+ new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki,
+ retrieved = retrieval, sha256 = sha256hex(der)))
node.clear()
while node.getprevious() is not None:
del root[0]
- if len(bulk_new) > chunk:
- logger.debug("Bulk creation of new RPKIObjects")
- try:
- RPKIObject.objects.bulk_create(bulk_new)
- except IntegrityError:
- logger.debug("Some objects already existed, weeding and retrying")
- i = 0
- while i < len(bulk_new):
- try:
- bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der))
- except RPKIObject.DoesNotExist:
- i += 1
- else:
- del bulk_new[i]
- RPKIObject.objects.bulk_create(bulk_new)
- del bulk_new[:]
+ if len(new_rpkiobjects) > chunk:
+ yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects)
yield tornado.gen.moment
- if len(bulk_new) > 0:
- logger.debug("Bulk creation of new RPKIObjects")
- try:
- RPKIObject.objects.bulk_create(bulk_new)
- except IntegrityError:
- logger.debug("Some objects already existed, weeding and retrying")
- i = 0
- while i < len(bulk_new):
- try:
- bulk_old.append(RPKIObject.objects.values_list("pk", flat = True).get(der = bulk_new[i].der))
- except RPKIObject.DoesNotExist:
- i += 1
- else:
- del bulk_new[i]
- RPKIObject.objects.bulk_create(bulk_new)
-
- n = len(bulk_old)
+ if len(new_rpkiobjects) > 0:
+ yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects)
+
+ n = len(existing_rpkiobjects)
for i in xrange(0, n, chunk):
- logger.debug("Bulk addition of existing RPKIObjects to RRDPSnapshot")
- snapshot.rpkiobject_set.add(*bulk_old[i : i + chunk])
+ snapshot.rpkiobject_set.add(*existing_rpkiobjects[i : i + chunk])
n = retrieval.rpkiobject_set.count()
for i in xrange(0, n, chunk):
- logger.debug("Bulk addition of new RPKIObjects to RRDPSnapshot")
snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk]))
snapshot.retrieved = retrieval