diff options
author | Michael Elkins <melkins@tislabs.com> | 2015-04-06 23:27:41 +0000 |
---|---|---|
committer | Michael Elkins <melkins@tislabs.com> | 2015-04-06 23:27:41 +0000 |
commit | 94b0f91257ce1a56b3da72a864db626f07702655 (patch) | |
tree | 0fb1aea30fb0c09ede3ce3223541805afd19beb6 | |
parent | f9b4a60de17585874aa87d08b30ca847887b4ad6 (diff) |
refactor route dump importer to share code common to text and MRT dumps
svn path=/trunk/; revision=6074
-rwxr-xr-x | ca/rpkigui-import-routes | 8 | ||||
-rw-r--r-- | rpki/gui/routeview/util.py | 248 |
2 files changed, 129 insertions, 127 deletions
diff --git a/ca/rpkigui-import-routes b/ca/rpkigui-import-routes index edff57cd..0fbe0126 100755 --- a/ca/rpkigui-import-routes +++ b/ca/rpkigui-import-routes @@ -51,11 +51,9 @@ from routeviews.org into the RPKI Web Portal database. If the input file is a bzip2 compressed file, it will be decompressed automatically.""") parser.add_option('-t', '--type', dest='filetype', metavar='TYPE', - help='Specify the input file type (auto, text, mrt) [Default: %default]') + help='Specify the input file type (text, mrt) [Default: %default]') parser.add_option('-l', '--level', dest='log_level', default='ERROR', help='Set logging level [Default: %default]') - parser.add_option('-u', '--bunzip2', dest='bunzip', metavar='PROG', - help='Specify bunzip2 program to use') parser.add_option('-b', '--bgpdump', dest='bgpdump', metavar='PROG', help='Specify path to bgdump binary') parser.add_option('-j', '--jitter', dest='jitter', type='int', @@ -64,7 +62,7 @@ automatically.""") help='Set name of lock file; empty string disables locking [Default: %default]') parser.add_option('--timeout', dest='timeout', type='int', help='Specify timeout for download and import, in seconds [Default: %default]') - parser.set_defaults(debug=False, verbose=False, filetype='auto', jitter=0, + parser.set_defaults(debug=False, verbose=False, filetype='text', jitter=0, lockfile='/tmp/rpkigui-import-routes.lock', timeout=90*60) options, args = parser.parse_args() @@ -104,7 +102,7 @@ automatically.""") signal.signal(signal.SIGALRM, timed_out) signal.setitimer(signal.ITIMER_REAL, options.timeout) - import_routeviews_dump(*args) + import_routeviews_dump(*args, filetype=options.filetype) if options.timeout > 0: signal.setitimer(signal.ITIMER_REAL, 0) diff --git a/rpki/gui/routeview/util.py b/rpki/gui/routeview/util.py index 6b6a0a9b..1340e9fa 100644 --- a/rpki/gui/routeview/util.py +++ b/rpki/gui/routeview/util.py @@ -22,6 +22,7 @@ import subprocess import time import logging import urlparse +import bz2 from urllib import urlretrieve, unquote from django.db import transaction, connection @@ -37,34 +38,118 @@ logger = logging.getLogger(__name__) # Eventually this can be retrived from rpki.conf DEFAULT_URL = 'http://archive.routeviews.org/oix-route-views/oix-full-snapshot-latest.dat.bz2' -def parse_text(f): - last_prefix = None - cursor = connection.cursor() - range_class = resource_range_ipv4 +class ParseError(Exception): pass + +class RouteDumpParser(object): + """Base class for parsing various route dump formats.""" + table = 'routeview_routeorigin' sql = "INSERT INTO %s_new SET asn=%%s, prefix_min=%%s, prefix_max=%%s" % table + range_class = resource_range_ipv4 - try: - logger.info('Dropping existing staging table...') - cursor.execute('DROP TABLE IF EXISTS %s_new' % table) - except _mysql_exceptions.Warning: - pass + def __init__(self, path, *args, **kwargs): + self.path = path + self.cursor = connection.cursor() + self.last_prefix = None + self.asns = set() + + def parse(self): + try: + logger.info('Dropping existing staging table...') + self.cursor.execute('DROP TABLE IF EXISTS %s_new' % self.table) + except _mysql_exceptions.Warning: + pass + + logger.info('Creating staging table...') + self.cursor.execute('CREATE TABLE %(table)s_new LIKE %(table)s' % {'table': self.table}) + + logger.info('Disabling autocommit...') + self.cursor.execute('SET autocommit=0') + + logger.info('Adding rows to table...') + for line in self.input: + try: + prefix, origin_as = self.parse_line(line) + except ParseError as e: + logger.warning('error while parsing line: {} ({})'.format(line, str(e))) + continue + + # the output may contain multiple paths to the same origin. + # if this is the same prefix as the last entry, we don't need + # to validate it again. + # + # prefixes are sorted, but the origin_as is not, so we keep a set to + # avoid duplicates, and insert into the db once we've seen all the + # origin_as values for a given prefix + if prefix != self.last_prefix: + self.ins_routes() + self.last_prefix = prefix + self.asns.add(origin_as) + + self.ins_routes() # process data from last line + + logger.info('Committing...') + self.cursor.execute('COMMIT') - logger.info('Creating staging table...') - cursor.execute('CREATE TABLE %(table)s_new LIKE %(table)s' % {'table': table}) + try: + logger.info('Dropping old table...') + self.cursor.execute('DROP TABLE IF EXISTS %s_old' % self.table) + except _mysql_exceptions.Warning: + pass + + logger.info('Swapping staging table with live table...') + self.cursor.execute('RENAME TABLE %(table)s TO %(table)s_old, %(table)s_new TO %(table)s' % {'table': self.table}) + + self.cleanup() # allow cleanup function to throw prior to COMMIT + + transaction.commit_unless_managed() - logger.info('Disabling autocommit...') - cursor.execute('SET autocommit=0') + logger.info('Updating timestamp metadata...') + rpki.gui.app.timestamp.update('bgp_v4_import') - logger.info('Adding rows to table...') - for row in itertools.islice(f, 5, None): + def parse_line(self, row): + "Parse one line of input. Return a (prefix, origin_as) tuple." + return None + + def cleanup(self): + pass + + def ins_routes(self): + # output routes for previous prefix + if self.last_prefix is not None: + try: + rng = self.range_class.parse_str(self.last_prefix) + rmin = long(rng.min) + rmax = long(rng.max) + self.cursor.executemany(self.sql, [(asn, rmin, rmax) for asn in self.asns]) + except BadIPResource: + logger.warning('skipping bad prefix: ' + self.last_prefix) + self.asns = set() # reset + + +class TextDumpParser(RouteDumpParser): + """Parses the RouteViews.org text dump.""" + + def __init__(self, *args, **kwargs): + super(TextDumpParser, self).__init__(*args, **kwargs) + if self.path.endswith('.bz2'): + logger.info('decompressing bz2 file') + self.file = bz2.BZ2File(self.path, buffering=4096) + else: + self.file = open(self.path, buffering=-1) + self.input = itertools.islice(self.file, 5, None) # skip first 5 lines + + def parse_line(self, row): + "Parse one line of input" cols = row.split() # index -1 is i/e/? for igp/egp - origin_as = cols[-2] - # FIXME: skip AS_SETs - if origin_as[0] == '{': - continue + try: + origin_as = int(cols[-2]) + except IndexError: + raise ParseError('unexpected format') + except ValueError: + raise ParseError('bad AS value') prefix = cols[1] @@ -78,85 +163,35 @@ def parse_text(f): s.append('assuming it should be %s' % prefix) logger.warning(' '.join(s)) - # the output may contain multiple paths to the same origin. - # if this is the same prefix as the last entry, we don't need - # to validate it again. - # - # prefixes are sorted, but the origin_as is not, so we keep a set to - # avoid duplicates, and insert into the db once we've seen all the - # origin_as values for a given prefix - if prefix != last_prefix: - # output routes for previous prefix - if last_prefix is not None: - try: - rng = range_class.parse_str(last_prefix) - rmin = long(rng.min) - rmax = long(rng.max) - cursor.executemany(sql, [(asn, rmin, rmax) for asn in asns]) - except BadIPResource: - logger.warning('skipping bad prefix: ' + last_prefix) - - asns = set() - last_prefix = prefix - - try: - asns.add(int(origin_as)) - except ValueError as err: - logger.warning('\n'.join( - ['unable to parse origin AS: ' + origin_as], - ['ValueError: ' + str(err)] - ['route entry was: ' + row], - )) - - logger.info('Committing...') - cursor.execute('COMMIT') + return prefix, origin_as - try: - logger.info('Dropping old table...') - cursor.execute('DROP TABLE IF EXISTS %s_old' % table) - except _mysql_exceptions.Warning: - pass + def cleanup(self): + self.file.close() - logger.info('Swapping staging table with live table...') - cursor.execute('RENAME TABLE %(table)s TO %(table)s_old, %(table)s_new TO %(table)s' % {'table': table}) - transaction.commit_unless_managed() +class MrtDumpParser(RouteDumpParser): + def __init__(self, *args, **kwargs): + super(MrtDumpParser, self).__init__(*args, **kwargs) + # filter input through bgpdump + # bgpdump can decompress bz2 files directly, no need to do it here + self.pipe = subprocess.Popen(['bgpdump', '-m', '-v', self.path], stdout=subprocess.PIPE, bufsize=-1) + self.input = self.pipe.stdout - logger.info('Updating timestamp metadata...') - rpki.gui.app.timestamp.update('bgp_v4_import') - - -def parse_mrt(f): - # filter input through bgpdump - pipe = subprocess.Popen(['bgpdump', '-m', '-v', '-'], stdin=f, - stdout=subprocess.PIPE) - - last_prefix = None - last_as = None - for e in pipe.stdout.readlines(): - a = e.split('|') + def parse_line(self, row): + a = row.split('|') prefix = a[5] try: origin_as = int(a[6].split()[-1]) except ValueError: - # skip AS_SETs - continue - - if prefix != last_prefix: - last_prefix = prefix - elif last_as == origin_as: - continue - last_as = origin_as + raise ParseError('bad AS value') - asns = PREFIXES.get(prefix) - if not asns: - asns = set() - PREFIXES[prefix] = asns - asns.add(origin_as) + return prefix, origin_as - pipe.wait() - if pipe.returncode: - raise ProgException('bgpdump exited with code %d' % pipe.returncode) + def cleanup(self): + logger.info('waiting for child process to terminate') + self.pipe.wait() + if self.pipe.returncode: + raise PipeFailed('bgpdump exited with code %d' % self.pipe.returncode) class ProgException(Exception): @@ -171,7 +206,7 @@ class PipeFailed(ProgException): pass -def import_routeviews_dump(filename=DEFAULT_URL, filetype='auto'): +def import_routeviews_dump(filename=DEFAULT_URL, filetype='text'): """Load the oix-full-snapshot-latest.bz2 from routeview.org into the rpki.gui.routeview database. @@ -199,43 +234,12 @@ def import_routeviews_dump(filename=DEFAULT_URL, filetype='auto'): # URL filename, headers = urlretrieve(filename, tmpname) - if filetype == 'auto': - # try to determine input type from filename, based on the default - # filenames from archive.routeviews.org - bname = os.path.basename(filename) - if bname.startswith('oix-full-snapshot-latest'): - filetype = 'text' - elif bname.startswith('rib.'): - filetype = 'mrt' - else: - raise UnknownInputType('unable to automatically determine input file type') - logging.info('Detected import format as "%s"', filetype) - - pipe = None - if filename.endswith('.bz2'): - bunzip = 'bunzip2' - logging.info('Decompressing input file on the fly...') - pipe = subprocess.Popen([bunzip, '--stdout', filename], - stdout=subprocess.PIPE, - bufsize=-1) - input_file = pipe.stdout - else: - input_file = open(filename) - try: - dispatch = {'text': parse_text, 'mrt': parse_mrt} - dispatch[filetype](input_file) + dispatch = {'text': TextDumpParser, 'mrt': MrtDumpParser} + dispatch[filetype](filename).parse() except KeyError: raise UnknownInputType('"%s" is an unknown input file type' % filetype) - if pipe: - logging.debug('Waiting for child to exit...') - pipe.wait() - if pipe.returncode: - raise PipeFailed('Child exited code %d' % pipe.returncode) - pipe = None - else: - input_file.close() finally: # make sure to always clean up the temp download file if tmpname is not None: |