aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/gui
diff options
context:
space:
mode:
authorMichael Elkins <melkins@tislabs.com>2012-01-19 00:35:52 +0000
committerMichael Elkins <melkins@tislabs.com>2012-01-19 00:35:52 +0000
commitedf891be7ebaf36182d27ad73a9e29e9ab4109d8 (patch)
tree85af57dbbd6e830ad7ec187ae9812a3e731b6346 /rpkid/rpki/gui
parent7735c9840baedceff3a5aa6c4d5a4b8fef400a3e (diff)
add new module range_list with implementation of a resource_set type object that collapses adjacent ranges and can compute missing ranges
svn path=/branches/tk161/; revision=4225
Diffstat (limited to 'rpkid/rpki/gui')
-rwxr-xr-xrpkid/rpki/gui/app/range_list.py222
-rw-r--r--rpkid/rpki/gui/app/views.py33
2 files changed, 239 insertions, 16 deletions
diff --git a/rpkid/rpki/gui/app/range_list.py b/rpkid/rpki/gui/app/range_list.py
new file mode 100755
index 00000000..76547555
--- /dev/null
+++ b/rpkid/rpki/gui/app/range_list.py
@@ -0,0 +1,222 @@
+import bisect
+import unittest
+
+class RangeList(list):
+ """A sorted list of ranges, which automatically merges adjacent ranges.
+
+ Items in the list are expected to have ".min" and ".max" attributes."""
+
+ def __init__(self, ini=None):
+ list.__init__(self)
+ if ini:
+ self.extend(ini)
+
+ def append(self, v):
+ keys = [x.min for x in self]
+
+ # lower bound
+ i = bisect.bisect_left(keys, v.min)
+
+ # upper bound
+ j = bisect.bisect_right(keys, v.max, lo=i)
+
+ # if the max value for the previous item is greater than v.min, include the previous item in the range to replace
+ # and use its min value. also include the previous item if the max value is 1 less than the min value for the
+ # inserted item
+ if i > 0 and self[i-1].max >= v.min - 1:
+ i = i - 1
+ vmin = self[i].min
+ else:
+ vmin = v.min
+
+ # if the max value for the previous item is greater than the max value for the new item, use the previous item's max
+ if j > 0 and self[j-1].max > v.max:
+ vmax = self[j-1].max
+ else:
+ vmax = v.max
+
+ # if the max value for the new item is 1 less than the min value for the next item, combine into a single item
+ if j < len(self) and vmax+1 == self[j].min:
+ vmax = self[j].max
+ j = j+1
+
+ # replace the range with a new object covering the entire range
+ self[i:j] = [v.__class__(min=vmin, max=vmax)]
+
+ def extend(self, args):
+ for x in args:
+ self.append(x)
+
+ def difference(self, other):
+ """Return a RangeList object which contains ranges in this object which are not in "other"."""
+ it = iter(other)
+
+ try:
+ cur = it.next()
+ except StopIteration:
+ return self
+
+ r = RangeList()
+
+ for x in self:
+ xmin = x.min
+
+ try:
+ while xmin <= x.max:
+ if xmin < cur.min:
+ r.append(x.__class__(min=xmin, max=min(x.max,cur.min-1)))
+ xmin = cur.max+1
+ elif xmin == cur.min:
+ xmin = cur.max+1
+ else: # xmin > cur.min
+ if xmin <= cur.max:
+ xmin = cur.max+1
+ else: # xmin > cur.max
+ cur = it.next()
+
+ except StopIteration:
+ r.append(x.__class__(min=xmin, max=x.max))
+
+ return r
+
+class TestRangeList(unittest.TestCase):
+ class MinMax(object):
+ def __init__(self, min, max):
+ self.min = min
+ self.max = max
+
+ def __str__(self):
+ return '(%d, %d)' % (self.min, self.max)
+
+ def __repr__(self):
+ return '<MinMax: (%d, %d)>' % (self.min, self.max)
+
+ def __eq__(self, other):
+ return self.min == other.min and self.max == other.max
+
+ def setUp(self):
+ self.v1 = TestRangeList.MinMax(1,2)
+ self.v2 = TestRangeList.MinMax(4,5)
+ self.v3 = TestRangeList.MinMax(7,8)
+ self.v4 = TestRangeList.MinMax(3,4)
+ self.v5 = TestRangeList.MinMax(2,3)
+ self.v6 = TestRangeList.MinMax(1,10)
+
+ def test_empty_append(self):
+ s = RangeList()
+ s.append(self.v1)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], self.v1)
+
+ def test_no_overlap(self):
+ s = RangeList()
+ s.append(self.v1)
+ s.append(self.v2)
+ self.assertTrue(len(s) == 2)
+ self.assertEqual(s[0], self.v1)
+ self.assertEqual(s[1], self.v2)
+
+ def test_no_overlap_prepend(self):
+ s = RangeList()
+ s.append(self.v2)
+ s.append(self.v1)
+ self.assertTrue(len(s) == 2)
+ self.assertEqual(s[0], self.v1)
+ self.assertEqual(s[1], self.v2)
+
+ def test_insert_middle(self):
+ s = RangeList()
+ s.append(self.v1)
+ s.append(self.v3)
+ s.append(self.v2)
+ self.assertTrue(len(s) == 3)
+ self.assertEqual(s[0], self.v1)
+ self.assertEqual(s[1], self.v2)
+ self.assertEqual(s[2], self.v3)
+
+ def test_append_overlap(self):
+ s = RangeList()
+ s.append(self.v1)
+ s.append(self.v5)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], TestRangeList.MinMax(1,3))
+
+ def test_combine_range(self):
+ s = RangeList()
+ s.append(self.v1)
+ s.append(self.v4)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], TestRangeList.MinMax(1,4))
+
+ def test_append_subset(self):
+ s = RangeList()
+ s.append(self.v6)
+ s.append(self.v3)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], self.v6)
+
+ def test_append_equal(self):
+ s = RangeList()
+ s.append(self.v6)
+ s.append(self.v6)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], self.v6)
+
+ def test_prepend_combine(self):
+ s = RangeList()
+ s.append(self.v4)
+ s.append(self.v1)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], TestRangeList.MinMax(1,4))
+
+ def test_append_aggregate(self):
+ s = RangeList()
+ s.append(self.v1)
+ s.append(self.v2)
+ s.append(self.v3)
+ s.append(self.v6)
+ self.assertTrue(len(s) == 1)
+ self.assertEqual(s[0], self.v6)
+
+ def test_diff_empty(self):
+ s = RangeList()
+ s.append(self.v1)
+ self.assertEqual(s, s.difference([]))
+
+ def test_diff_self(self):
+ s = RangeList()
+ s.append(self.v1)
+ self.assertEqual(s.difference(s), [])
+
+ def test_diff_middle(self):
+ s1 = RangeList([self.v6])
+ s2 = RangeList([self.v3])
+ self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,6), TestRangeList.MinMax(9, 10)]))
+
+ def test_diff_overlap(self):
+ s1 = RangeList([self.v2])
+ s2 = RangeList([self.v4])
+ self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(5,5)]))
+
+ def test_diff_overlap2(self):
+ s1 = RangeList([self.v2])
+ s2 = RangeList([self.v4])
+ self.assertEqual(s2.difference(s1), RangeList([TestRangeList.MinMax(3,3)]))
+
+ def test_diff_multi(self):
+ s1 = RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(4,5)])
+ s2 = RangeList([TestRangeList.MinMax(4,4)])
+ self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(5,5)]))
+
+ def test_diff_multi_overlap(self):
+ s1 = RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(3,4)])
+ s2 = RangeList([TestRangeList.MinMax(2,3)])
+ self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,1), TestRangeList.MinMax(4,4)]))
+
+ def test_diff_multi_overlap2(self):
+ s1 = RangeList([TestRangeList.MinMax(1,2), TestRangeList.MinMax(3,4), TestRangeList.MinMax(6,7)])
+ s2 = RangeList([TestRangeList.MinMax(2,3), TestRangeList.MinMax(6,6)])
+ self.assertEqual(s1.difference(s2), RangeList([TestRangeList.MinMax(1,1), TestRangeList.MinMax(4,4), TestRangeList.MinMax(7,7)]))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/rpkid/rpki/gui/app/views.py b/rpkid/rpki/gui/app/views.py
index b61c846d..37cca667 100644
--- a/rpkid/rpki/gui/app/views.py
+++ b/rpkid/rpki/gui/app/views.py
@@ -33,7 +33,7 @@ from django.views.generic.list_detail import object_list, object_detail
from django.views.generic.create_update import delete_object, update_object, create_object
from django.core.urlresolvers import reverse
-from rpki.gui.app import models, forms, glue, settings
+from rpki.gui.app import models, forms, glue, settings, range_list
from rpki import resource_set
import rpki.irdb
import rpki.exceptions
@@ -103,7 +103,7 @@ def dashboard(request, template_name='app/dashboard.html'):
conf = request.session['handle']
- used_asns = resource_set.resource_set_as()
+ used_asns = range_list.RangeList()
# asns used in my roas
roa_asns = set((obj.asn for obj in models.ROARequest.objects.filter(issuer=conf)))
@@ -113,39 +113,40 @@ def dashboard(request, template_name='app/dashboard.html'):
child_asns = rpki.irdb.models.ChildASN.objects.filter(child__in=conf.children.all())
used_asns.extend((resource_set.resource_range_as(obj.start_as, obj.end_as) for obj in child_asns))
- used_asns.canonize()
-
# my received asns
asns = models.ResourceRangeAS.objects.filter(cert__parent__issuer=conf)
- my_asns = resource_set.resource_set_as([resource_set.resource_range_as(obj.min, obj.max) for obj in asns])
+ my_asns = range_list.RangeList([resource_set.resource_range_as(obj.min, obj.max) for obj in asns])
unused_asns = my_asns.difference(used_asns)
- used_prefixes = resource_set.resource_set_ipv4()
- used_prefixes_v6 = resource_set.resource_set_ipv6()
+ used_prefixes = range_list.RangeList()
+ used_prefixes_v6 = range_list.RangeList()
# prefixes used in my roas
- used_prefixes.extend((obj.as_resource_range() for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv4')))
- used_prefixes_v6.extend((obj.as_resource_range() for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv6')))
+ for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv4'):
+ used_prefixes.append(obj.as_resource_range())
+
+ for obj in models.ROARequestPrefix.objects.filter(roa_request__issuer=conf, version='IPv6'):
+ used_prefixes_v6.append(obj.as_resource_range())
# prefixes given to my children
- used_prefixes.extend((obj.as_resource_range() for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv4')))
- used_prefixes_v6.extend((obj.as_resource_range() for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv6')))
+ for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv4'):
+ used_prefixes.append(obj.as_resource_range())
- used_prefixes.canonize()
- used_prefixes_v6.canonize()
+ for obj in rpki.irdb.models.ChildNet.objects.filter(child__in=conf.children.all(), version='IPv6'):
+ used_prefixes_v6.append(obj.as_resource_range())
# my received prefixes
prefixes = models.ResourceRangeAddressV4.objects.filter(cert__parent__issuer=conf)
prefixes_v6 = models.ResourceRangeAddressV6.objects.filter(cert__parent__issuer=conf)
- my_prefixes = resource_set.resource_set_ipv4([obj.as_resource_range() for obj in prefixes])
- my_prefixes_v6 = resource_set.resource_set_ipv6([obj.as_resource_range() for obj in prefixes_v6])
+ my_prefixes = range_list.RangeList([obj.as_resource_range() for obj in prefixes])
+ my_prefixes_v6 = range_list.RangeList([obj.as_resource_range() for obj in prefixes_v6])
unused_prefixes = my_prefixes.difference(used_prefixes)
unused_prefixes_v6 = my_prefixes_v6.difference(used_prefixes_v6)
return render(template_name, {
- 'conf': handle,
+ 'conf': conf,
'unused_asns': unused_asns,
'unused_prefixes': unused_prefixes,
'unused_prefixes_v6': unused_prefixes_v6,