aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Elkins <melkins@tislabs.com>2015-04-06 23:27:41 +0000
committerMichael Elkins <melkins@tislabs.com>2015-04-06 23:27:41 +0000
commit94b0f91257ce1a56b3da72a864db626f07702655 (patch)
tree0fb1aea30fb0c09ede3ce3223541805afd19beb6
parentf9b4a60de17585874aa87d08b30ca847887b4ad6 (diff)
refactor route dump importer to share code common to text and MRT dumps
svn path=/trunk/; revision=6074
-rwxr-xr-xca/rpkigui-import-routes8
-rw-r--r--rpki/gui/routeview/util.py248
2 files changed, 129 insertions, 127 deletions
diff --git a/ca/rpkigui-import-routes b/ca/rpkigui-import-routes
index edff57cd..0fbe0126 100755
--- a/ca/rpkigui-import-routes
+++ b/ca/rpkigui-import-routes
@@ -51,11 +51,9 @@ from routeviews.org into the RPKI Web Portal database. If the
input file is a bzip2 compressed file, it will be decompressed
automatically.""")
parser.add_option('-t', '--type', dest='filetype', metavar='TYPE',
- help='Specify the input file type (auto, text, mrt) [Default: %default]')
+ help='Specify the input file type (text, mrt) [Default: %default]')
parser.add_option('-l', '--level', dest='log_level', default='ERROR',
help='Set logging level [Default: %default]')
- parser.add_option('-u', '--bunzip2', dest='bunzip', metavar='PROG',
- help='Specify bunzip2 program to use')
parser.add_option('-b', '--bgpdump', dest='bgpdump', metavar='PROG',
help='Specify path to bgdump binary')
parser.add_option('-j', '--jitter', dest='jitter', type='int',
@@ -64,7 +62,7 @@ automatically.""")
help='Set name of lock file; empty string disables locking [Default: %default]')
parser.add_option('--timeout', dest='timeout', type='int',
help='Specify timeout for download and import, in seconds [Default: %default]')
- parser.set_defaults(debug=False, verbose=False, filetype='auto', jitter=0,
+ parser.set_defaults(debug=False, verbose=False, filetype='text', jitter=0,
lockfile='/tmp/rpkigui-import-routes.lock', timeout=90*60)
options, args = parser.parse_args()
@@ -104,7 +102,7 @@ automatically.""")
signal.signal(signal.SIGALRM, timed_out)
signal.setitimer(signal.ITIMER_REAL, options.timeout)
- import_routeviews_dump(*args)
+ import_routeviews_dump(*args, filetype=options.filetype)
if options.timeout > 0:
signal.setitimer(signal.ITIMER_REAL, 0)
diff --git a/rpki/gui/routeview/util.py b/rpki/gui/routeview/util.py
index 6b6a0a9b..1340e9fa 100644
--- a/rpki/gui/routeview/util.py
+++ b/rpki/gui/routeview/util.py
@@ -22,6 +22,7 @@ import subprocess
import time
import logging
import urlparse
+import bz2
from urllib import urlretrieve, unquote
from django.db import transaction, connection
@@ -37,34 +38,118 @@ logger = logging.getLogger(__name__)
# Eventually this can be retrived from rpki.conf
DEFAULT_URL = 'http://archive.routeviews.org/oix-route-views/oix-full-snapshot-latest.dat.bz2'
-def parse_text(f):
- last_prefix = None
- cursor = connection.cursor()
- range_class = resource_range_ipv4
+class ParseError(Exception): pass
+
+class RouteDumpParser(object):
+ """Base class for parsing various route dump formats."""
+
table = 'routeview_routeorigin'
sql = "INSERT INTO %s_new SET asn=%%s, prefix_min=%%s, prefix_max=%%s" % table
+ range_class = resource_range_ipv4
- try:
- logger.info('Dropping existing staging table...')
- cursor.execute('DROP TABLE IF EXISTS %s_new' % table)
- except _mysql_exceptions.Warning:
- pass
+ def __init__(self, path, *args, **kwargs):
+ self.path = path
+ self.cursor = connection.cursor()
+ self.last_prefix = None
+ self.asns = set()
+
+ def parse(self):
+ try:
+ logger.info('Dropping existing staging table...')
+ self.cursor.execute('DROP TABLE IF EXISTS %s_new' % self.table)
+ except _mysql_exceptions.Warning:
+ pass
+
+ logger.info('Creating staging table...')
+ self.cursor.execute('CREATE TABLE %(table)s_new LIKE %(table)s' % {'table': self.table})
+
+ logger.info('Disabling autocommit...')
+ self.cursor.execute('SET autocommit=0')
+
+ logger.info('Adding rows to table...')
+ for line in self.input:
+ try:
+ prefix, origin_as = self.parse_line(line)
+ except ParseError as e:
+ logger.warning('error while parsing line: {} ({})'.format(line, str(e)))
+ continue
+
+ # the output may contain multiple paths to the same origin.
+ # if this is the same prefix as the last entry, we don't need
+ # to validate it again.
+ #
+ # prefixes are sorted, but the origin_as is not, so we keep a set to
+ # avoid duplicates, and insert into the db once we've seen all the
+ # origin_as values for a given prefix
+ if prefix != self.last_prefix:
+ self.ins_routes()
+ self.last_prefix = prefix
+ self.asns.add(origin_as)
+
+ self.ins_routes() # process data from last line
+
+ logger.info('Committing...')
+ self.cursor.execute('COMMIT')
- logger.info('Creating staging table...')
- cursor.execute('CREATE TABLE %(table)s_new LIKE %(table)s' % {'table': table})
+ try:
+ logger.info('Dropping old table...')
+ self.cursor.execute('DROP TABLE IF EXISTS %s_old' % self.table)
+ except _mysql_exceptions.Warning:
+ pass
+
+ logger.info('Swapping staging table with live table...')
+ self.cursor.execute('RENAME TABLE %(table)s TO %(table)s_old, %(table)s_new TO %(table)s' % {'table': self.table})
+
+ self.cleanup() # allow cleanup function to throw prior to COMMIT
+
+ transaction.commit_unless_managed()
- logger.info('Disabling autocommit...')
- cursor.execute('SET autocommit=0')
+ logger.info('Updating timestamp metadata...')
+ rpki.gui.app.timestamp.update('bgp_v4_import')
- logger.info('Adding rows to table...')
- for row in itertools.islice(f, 5, None):
+ def parse_line(self, row):
+ "Parse one line of input. Return a (prefix, origin_as) tuple."
+ return None
+
+ def cleanup(self):
+ pass
+
+ def ins_routes(self):
+ # output routes for previous prefix
+ if self.last_prefix is not None:
+ try:
+ rng = self.range_class.parse_str(self.last_prefix)
+ rmin = long(rng.min)
+ rmax = long(rng.max)
+ self.cursor.executemany(self.sql, [(asn, rmin, rmax) for asn in self.asns])
+ except BadIPResource:
+ logger.warning('skipping bad prefix: ' + self.last_prefix)
+ self.asns = set() # reset
+
+
+class TextDumpParser(RouteDumpParser):
+ """Parses the RouteViews.org text dump."""
+
+ def __init__(self, *args, **kwargs):
+ super(TextDumpParser, self).__init__(*args, **kwargs)
+ if self.path.endswith('.bz2'):
+ logger.info('decompressing bz2 file')
+ self.file = bz2.BZ2File(self.path, buffering=4096)
+ else:
+ self.file = open(self.path, buffering=-1)
+ self.input = itertools.islice(self.file, 5, None) # skip first 5 lines
+
+ def parse_line(self, row):
+ "Parse one line of input"
cols = row.split()
# index -1 is i/e/? for igp/egp
- origin_as = cols[-2]
- # FIXME: skip AS_SETs
- if origin_as[0] == '{':
- continue
+ try:
+ origin_as = int(cols[-2])
+ except IndexError:
+ raise ParseError('unexpected format')
+ except ValueError:
+ raise ParseError('bad AS value')
prefix = cols[1]
@@ -78,85 +163,35 @@ def parse_text(f):
s.append('assuming it should be %s' % prefix)
logger.warning(' '.join(s))
- # the output may contain multiple paths to the same origin.
- # if this is the same prefix as the last entry, we don't need
- # to validate it again.
- #
- # prefixes are sorted, but the origin_as is not, so we keep a set to
- # avoid duplicates, and insert into the db once we've seen all the
- # origin_as values for a given prefix
- if prefix != last_prefix:
- # output routes for previous prefix
- if last_prefix is not None:
- try:
- rng = range_class.parse_str(last_prefix)
- rmin = long(rng.min)
- rmax = long(rng.max)
- cursor.executemany(sql, [(asn, rmin, rmax) for asn in asns])
- except BadIPResource:
- logger.warning('skipping bad prefix: ' + last_prefix)
-
- asns = set()
- last_prefix = prefix
-
- try:
- asns.add(int(origin_as))
- except ValueError as err:
- logger.warning('\n'.join(
- ['unable to parse origin AS: ' + origin_as],
- ['ValueError: ' + str(err)]
- ['route entry was: ' + row],
- ))
-
- logger.info('Committing...')
- cursor.execute('COMMIT')
+ return prefix, origin_as
- try:
- logger.info('Dropping old table...')
- cursor.execute('DROP TABLE IF EXISTS %s_old' % table)
- except _mysql_exceptions.Warning:
- pass
+ def cleanup(self):
+ self.file.close()
- logger.info('Swapping staging table with live table...')
- cursor.execute('RENAME TABLE %(table)s TO %(table)s_old, %(table)s_new TO %(table)s' % {'table': table})
- transaction.commit_unless_managed()
+class MrtDumpParser(RouteDumpParser):
+ def __init__(self, *args, **kwargs):
+ super(MrtDumpParser, self).__init__(*args, **kwargs)
+ # filter input through bgpdump
+ # bgpdump can decompress bz2 files directly, no need to do it here
+ self.pipe = subprocess.Popen(['bgpdump', '-m', '-v', self.path], stdout=subprocess.PIPE, bufsize=-1)
+ self.input = self.pipe.stdout
- logger.info('Updating timestamp metadata...')
- rpki.gui.app.timestamp.update('bgp_v4_import')
-
-
-def parse_mrt(f):
- # filter input through bgpdump
- pipe = subprocess.Popen(['bgpdump', '-m', '-v', '-'], stdin=f,
- stdout=subprocess.PIPE)
-
- last_prefix = None
- last_as = None
- for e in pipe.stdout.readlines():
- a = e.split('|')
+ def parse_line(self, row):
+ a = row.split('|')
prefix = a[5]
try:
origin_as = int(a[6].split()[-1])
except ValueError:
- # skip AS_SETs
- continue
-
- if prefix != last_prefix:
- last_prefix = prefix
- elif last_as == origin_as:
- continue
- last_as = origin_as
+ raise ParseError('bad AS value')
- asns = PREFIXES.get(prefix)
- if not asns:
- asns = set()
- PREFIXES[prefix] = asns
- asns.add(origin_as)
+ return prefix, origin_as
- pipe.wait()
- if pipe.returncode:
- raise ProgException('bgpdump exited with code %d' % pipe.returncode)
+ def cleanup(self):
+ logger.info('waiting for child process to terminate')
+ self.pipe.wait()
+ if self.pipe.returncode:
+ raise PipeFailed('bgpdump exited with code %d' % self.pipe.returncode)
class ProgException(Exception):
@@ -171,7 +206,7 @@ class PipeFailed(ProgException):
pass
-def import_routeviews_dump(filename=DEFAULT_URL, filetype='auto'):
+def import_routeviews_dump(filename=DEFAULT_URL, filetype='text'):
"""Load the oix-full-snapshot-latest.bz2 from routeview.org into the
rpki.gui.routeview database.
@@ -199,43 +234,12 @@ def import_routeviews_dump(filename=DEFAULT_URL, filetype='auto'):
# URL
filename, headers = urlretrieve(filename, tmpname)
- if filetype == 'auto':
- # try to determine input type from filename, based on the default
- # filenames from archive.routeviews.org
- bname = os.path.basename(filename)
- if bname.startswith('oix-full-snapshot-latest'):
- filetype = 'text'
- elif bname.startswith('rib.'):
- filetype = 'mrt'
- else:
- raise UnknownInputType('unable to automatically determine input file type')
- logging.info('Detected import format as "%s"', filetype)
-
- pipe = None
- if filename.endswith('.bz2'):
- bunzip = 'bunzip2'
- logging.info('Decompressing input file on the fly...')
- pipe = subprocess.Popen([bunzip, '--stdout', filename],
- stdout=subprocess.PIPE,
- bufsize=-1)
- input_file = pipe.stdout
- else:
- input_file = open(filename)
-
try:
- dispatch = {'text': parse_text, 'mrt': parse_mrt}
- dispatch[filetype](input_file)
+ dispatch = {'text': TextDumpParser, 'mrt': MrtDumpParser}
+ dispatch[filetype](filename).parse()
except KeyError:
raise UnknownInputType('"%s" is an unknown input file type' % filetype)
- if pipe:
- logging.debug('Waiting for child to exit...')
- pipe.wait()
- if pipe.returncode:
- raise PipeFailed('Child exited code %d' % pipe.returncode)
- pipe = None
- else:
- input_file.close()
finally:
# make sure to always clean up the temp download file
if tmpname is not None: