diff options
author | Michael Elkins <melkins@tislabs.com> | 2012-01-19 00:35:52 +0000 |
---|---|---|
committer | Michael Elkins <melkins@tislabs.com> | 2012-01-19 00:35:52 +0000 |
commit | edf891be7ebaf36182d27ad73a9e29e9ab4109d8 (patch) | |
tree | 85af57dbbd6e830ad7ec187ae9812a3e731b6346 /rpkid/rpki/gui | |
parent | 7735c9840baedceff3a5aa6c4d5a4b8fef400a3e (diff) |
add new module range_list with implementation of a resource_set type object that collapses adjacent ranges and can compute missing ranges
svn path=/branches/tk161/; revision=4225
Diffstat (limited to 'rpkid/rpki/gui')
-rwxr-xr-x | rpkid/rpki/gui/app/range_list.py | 222 | ||||
-rw-r--r-- | rpkid/rpki/gui/app/views.py | 33 |
2 files changed, 239 insertions, 16 deletions
diff --git a/rpkid/rpki/gui/app/range_list.py b/rpkid/rpki/gui/app/range_list.py new file mode 100755 index 00000000..76547555 --- /dev/null +++ b/rpkid/rpki/gui/app/range_list.py @@ -0,0 +1,222 @@ +import bisect +import unittest + +class RangeList(list): + """A sorted list of ranges, which automatically merges adjacent ranges. + + Items in the list are expected to have ".min" and ".max" attributes.""" + + def __init__(self, ini=None): + list.__init__(self) + if ini: + self.extend(ini) + + def append(self, v): + keys = [x.min for x in self] + + # lower bound + i = bisect.bisect_left(keys, v.min) + + # upper bound + j = bisect.bisect_right(keys, v.max, lo=i) + + # if the max value for the previous item is greater than v.min, include the previous item in the range to replace + # and use its min value. also include the previous item if the max value is 1 less than the min value for the + # inserted item + if i > 0 and self[i-1].max >= v.min - 1: + i = i - 1 + vmin = self[i].min + else: + vmin = v.min + + # if the max value for the previous item is greater than the max value for the new item, use the previous item's max + if j > 0 and self[j-1].max > v.max: + vmax = self[j-1].max + else: + vmax = v.max + + # if the max value for the new item is 1 less than the min value for the next item, combine into a single item + if j < len(self) and vmax+1 == self[j].min: + vmax = self[j].max + j = j+1 + + # replace the range with a new object covering the entire range + self[i:j] = [v.__class__(min=vmin, max=vmax)] + + def extend(self, args): + for x in args: + self.append(x) + + def difference(self, other): + """Return a RangeList object which contains ranges in this object which are not in "other".""" + it = iter(other) + + try: + cur = it.next() + except StopIteration: + return self + + r = RangeList() + + for x in self: + xmin = x.min + + try: + while xmin <= x.max: + if xmin < cur.min: + r.append(x.__class__(min=xmin, max=min(x.max,cur.min-1))) + xmin = cur.max+1 + elif xmin == cur.min: + xmin = cur.max+1 + else: # xmin > cur.min + if xmin <= cur.max: + xmin = cur.max+1 + else: # xmin > cur.max + cur = it.next() + + except StopIteration: + r.append(x.__class__(min=xmin, max=x.max)) + + return r + +class TestRangeList(unittest.TestCase): + class MinMax(object): + def __init__(self, min, max): + self.min = min + self.max = max + + def __str__(self): + return '(%d, %d)' % (self.min, self.max) + + def __repr__(self): + return '<MinMax: (%d, %d)>' % (self.min, self.max) + + def __eq__(self, other): + return self.min == other.min and self.max == other.max + + def setUp(self): + self.v1 = TestRangeList.MinMax(1,2) + self.v2 = TestRangeList.MinMax(4,5) + self.v3 = TestRangeList.MinMax(7,8) + self.v4 = TestRangeList.MinMax(3,4) + self.v5 = TestRangeList.MinMax(2,3) + self.v6 = TestRangeList.MinMax(1,10) + + def test_empty_append(self): + s = RangeList() + s.append(self.v1) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], self.v1) + + def test_no_overlap(self): + s = RangeList() + s.append(self.v1) + s.append(self.v2) + self.assertTrue(len(s) == 2) + self.assertEqual(s[0], self.v1) + self.assertEqual(s[1], self.v2) + + def test_no_overlap_prepend(self): + s = RangeList() + s.append(self.v2) + s.append(self.v1) + self.assertTrue(len(s) == 2) + self.assertEqual(s[0], self.v1) + self.assertEqual(s[1], self.v2) + + def test_insert_middle(self): + s = RangeList() + s.append(self.v1) + s.append(self.v3) + s.append(self.v2) + self.assertTrue(len(s) == 3) + self.assertEqual(s[0], self.v1) + self.assertEqual(s[1], self.v2) + self.assertEqual(s[2], self.v3) + + def test_append_overlap(self): + s = RangeList() + s.append(self.v1) + s.append(self.v5) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], TestRangeList.MinMax(1,3)) + + def test_combine_range(self): + s = RangeList() + s.append(self.v1) + s.append(self.v4) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], TestRangeList.MinMax(1,4)) + + def test_append_subset(self): + s = RangeList() + s.append(self.v6) + s.append(self.v3) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], self.v6) + + def test_append_equal(self): + s = RangeList() + s.append(self.v6) + s.append(self.v6) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], self.v6) + + def test_prepend_combine(self): + s = RangeList() + s.append(self.v4) + s.append(self.v1) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], TestRangeList.MinMax(1,4)) + + def test_append_aggregate(self): + s = RangeList() + s.append(self.v1) + s.append(self.v2) + s.append(self.v3) + s.append(self.v6) + self.assertTrue(len(s) == 1) + self.assertEqual(s[0], self.v6) + + def test_diff_empty(self): + s = RangeList() + s.append(self.v1) + self.assertEqual(s, s.difference([])) + + def test_diff_self(self): + s = RangeList() + s.append(self.v1) + self.assertEqual(s.difference(s), []) + + def test_diff_middle(self): + s1 = RangeList([self.v6]) + s2 = RangeList([self.v3]) + self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,6), TestRangeList.MinMax(9, 10)])) + + def test_diff_overlap(self): + s1 = RangeList([self.v2]) + s2 = RangeList([self.v4]) + self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(5,5)])) + + def test_diff_overlap2(self): + s1 = RangeList([self.v2]) + s2 = RangeList([self.v4]) + self.assertEqual(s2.difference(s1), RangeList([TestRangeList.MinMax(3,3)])) + + def test_diff_multi(self): + s1 = RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(4,5)]) + s2 = RangeList([TestRangeList.MinMax(4,4)]) + self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(5,5)])) + + def test_diff_multi_overlap(self): + s1 = RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(3,4)]) + s2 = RangeList([TestRangeList.MinMax(2,3)]) + self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,1), TestRangeList.MinMax(4,4)])) + + def test_diff_multi_overlap2(self): + s1 = RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(3,4), TestRangeList.MinMax(6,7)]) + s2 = RangeList([TestRangeList.MinMax(2,3), TestRangeList.MinMax(6,6)]) + self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,1), TestRangeList.MinMax(4,4), TestRangeList.MinMax(7,7)])) + +if __name__ == '__main__': + unittest.main() diff --git a/rpkid/rpki/gui/app/views.py b/rpkid/rpki/gui/app/views.py index b61c846d..37cca667 100644 --- a/rpkid/rpki/gui/app/views.py +++ b/rpkid/rpki/gui/app/views.py @@ -33,7 +33,7 @@ from django.views.generic.list_detail import object_list, object_detail from django.views.generic.create_update import delete_object, update_object, create_object from django.core.urlresolvers import reverse -from rpki.gui.app import models, forms, glue, settings +from rpki.gui.app import models, forms, glue, settings, range_list from rpki import resource_set import rpki.irdb import rpki.exceptions @@ -103,7 +103,7 @@ def dashboard(request, template_name='app/dashboard.html'): conf = request.session['handle'] - used_asns = resource_set.resource_set_as() + used_asns = range_list.RangeList() # asns used in my roas roa_asns = set((obj.asn for obj in models.ROARequest.objects.filter(issuer=conf))) @@ -113,39 +113,40 @@ def dashboard(request, template_name='app/dashboard.html'): child_asns = rpki.irdb.models.ChildASN.objects.filter(child__in=conf.children.all()) used_asns.extend((resource_set.resource_range_as(obj.start_as, obj.end_as) for obj in child_asns)) - used_asns.canonize() - # my received asns asns = models.ResourceRangeAS.objects.filter(cert__parent__issuer=conf) - my_asns = resource_set.resource_set_as([resource_set.resource_range_as(obj.min, obj.max) for obj in asns]) + my_asns = range_list.RangeList([resource_set.resource_range_as(obj.min, obj.max) for obj in asns]) unused_asns = my_asns.difference(used_asns) - used_prefixes = resource_set.resource_set_ipv4() - used_prefixes_v6 = resource_set.resource_set_ipv6() + used_prefixes = range_list.RangeList() + used_prefixes_v6 = range_list.RangeList() # prefixes used in my roas - used_prefixes.extend((obj.as_resource_range() for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv4'))) - used_prefixes_v6.extend((obj.as_resource_range() for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv6'))) + for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv4'): + used_prefixes.append(obj.as_resource_range()) + + for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv6'): + used_prefixes_v6.append(obj.as_resource_range()) # prefixes given to my children - used_prefixes.extend((obj.as_resource_range() for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv4'))) - used_prefixes_v6.extend((obj.as_resource_range() for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv6'))) + for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv4'): + used_prefixes.append(obj.as_resource_range()) - used_prefixes.canonize() - used_prefixes_v6.canonize() + for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv6'): + used_prefixes_v6.append(obj.as_resource_range()) # my received prefixes prefixes = models.ResourceRangeAddressV4.objects.filter(cert__parent__issuer=conf) prefixes_v6 = models.ResourceRangeAddressV6.objects.filter(cert__parent__issuer=conf) - my_prefixes = resource_set.resource_set_ipv4([obj.as_resource_range() for obj in prefixes]) - my_prefixes_v6 = resource_set.resource_set_ipv6([obj.as_resource_range() for obj in prefixes_v6]) + my_prefixes = range_list.RangeList([obj.as_resource_range() for obj in prefixes]) + my_prefixes_v6 = range_list.RangeList([obj.as_resource_range() for obj in prefixes_v6]) unused_prefixes = my_prefixes.difference(used_prefixes) unused_prefixes_v6 = my_prefixes_v6.difference(used_prefixes_v6) return render(template_name, { - 'conf': handle, + 'conf': conf, 'unused_asns': unused_asns, 'unused_prefixes': unused_prefixes, 'unused_prefixes_v6': unused_prefixes_v6, |