aboutsummaryrefslogtreecommitdiff
path: root/potpourri/roa-to-irr.py
diff options
context:
space:
mode:
Diffstat (limited to 'potpourri/roa-to-irr.py')
-rw-r--r--potpourri/roa-to-irr.py236
1 files changed, 118 insertions, 118 deletions
diff --git a/potpourri/roa-to-irr.py b/potpourri/roa-to-irr.py
index 500596f8..748f37fa 100644
--- a/potpourri/roa-to-irr.py
+++ b/potpourri/roa-to-irr.py
@@ -1,12 +1,12 @@
# $Id$
-#
+#
# Copyright (C) 2014 Dragon Research Labs ("DRL")
# Portions copyright (C) 2010--2012 Internet Systems Consortium ("ISC")
-#
+#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notices and this permission notice appear in all copies.
-#
+#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR
@@ -34,128 +34,128 @@ from time import time, strftime, gmtime, asctime
args = None
class route(object):
- """
- Interesting parts of a route object.
- """
-
- def __init__(self, label, uri, asnum, date, prefix, prefixlen, max_prefixlen):
- self.label = label
- self.uri = uri
- self.asn = asnum
- self.date = date
- self.prefix = prefix
- self.prefixlen = prefixlen
- self.max_prefixlen = self.prefixlen if max_prefixlen is None else max_prefixlen
-
- def __cmp__(self, other):
- result = cmp(self.asn, other.asn)
- if result == 0:
- result = cmp(self.prefix, other.prefix)
- if result == 0:
- result = cmp(self.prefixlen, other.prefixlen)
- if result == 0:
- result = cmp(self.max_prefixlen, other.max_prefixlen)
- if result == 0:
- result = cmp(self.date, other.date)
- return result
-
- def __str__(self):
- lines = "\n" if args.email else ""
- lines += dedent('''\
- {self.label:<14s}{self.prefix}/{self.prefixlen}
- descr: {self.prefix}/{self.prefixlen}-{self.max_prefixlen}
- origin: AS{self.asn:d}
- notify: {args.notify}
- mnt-by: {args.mnt_by}
- changed: {args.changed_by} {self.date}
- source: {args.source}
- ''').format(self = self, args = args)
- if args.password is not None:
- lines += "override: {}\n".format(args.password)
- return lines
-
- def write(self, output_directory):
- name = "{0.prefix}-{0.prefixlen}-{0.max_prefixlen}-AS{0.asn:d}-{0.date}".format(self)
- with open(os.path.join(output_directory, name), "w") as f:
- f.write(str(self))
+ """
+ Interesting parts of a route object.
+ """
+
+ def __init__(self, label, uri, asnum, date, prefix, prefixlen, max_prefixlen):
+ self.label = label
+ self.uri = uri
+ self.asn = asnum
+ self.date = date
+ self.prefix = prefix
+ self.prefixlen = prefixlen
+ self.max_prefixlen = self.prefixlen if max_prefixlen is None else max_prefixlen
+
+ def __cmp__(self, other):
+ result = cmp(self.asn, other.asn)
+ if result == 0:
+ result = cmp(self.prefix, other.prefix)
+ if result == 0:
+ result = cmp(self.prefixlen, other.prefixlen)
+ if result == 0:
+ result = cmp(self.max_prefixlen, other.max_prefixlen)
+ if result == 0:
+ result = cmp(self.date, other.date)
+ return result
+
+ def __str__(self):
+ lines = "\n" if args.email else ""
+ lines += dedent('''\
+ {self.label:<14s}{self.prefix}/{self.prefixlen}
+ descr: {self.prefix}/{self.prefixlen}-{self.max_prefixlen}
+ origin: AS{self.asn:d}
+ notify: {args.notify}
+ mnt-by: {args.mnt_by}
+ changed: {args.changed_by} {self.date}
+ source: {args.source}
+ ''').format(self = self, args = args)
+ if args.password is not None:
+ lines += "override: {}\n".format(args.password)
+ return lines
+
+ def write(self, output_directory):
+ name = "{0.prefix}-{0.prefixlen}-{0.max_prefixlen}-AS{0.asn:d}-{0.date}".format(self)
+ with open(os.path.join(output_directory, name), "w") as f:
+ f.write(str(self))
class route_list(list):
- """
- A list of route objects.
- """
-
- def __init__(self, rcynic_dir):
- for root, dirs, files in os.walk(rcynic_dir):
- for f in files:
- if f.endswith(".roa"):
- path = os.path.join(root, f)
- uri = "rsync://" + path[len(rcynic_dir):].lstrip("/")
- roa = rpki.x509.ROA(DER_file = path)
- roa.extract()
- assert roa.get_POW().getVersion() == 0, "ROA version is {:d}, expected 0".format(roa.get_POW().getVersion())
- asnum = roa.get_POW().getASID()
- notBefore = roa.get_POW().certs()[0].getNotBefore().strftime("%Y%m%d")
- v4, v6 = roa.get_POW().getPrefixes()
- if v4 is not None:
- for prefix, prefixlen, max_prefixlen in v4:
- self.append(route("route:", uri, asnum, notBefore, prefix, prefixlen, max_prefixlen))
- if v6 is not None:
- for prefix, prefixlen, max_prefixlen in v6:
- self.append(route("route6:", uri, asnum, notBefore, prefix, prefixlen, max_prefixlen))
- self.sort()
- for i in xrange(len(self) - 2, -1, -1):
- if self[i] == self[i + 1]:
- del self[i + 1]
+ """
+ A list of route objects.
+ """
+
+ def __init__(self, rcynic_dir):
+ for root, dirs, files in os.walk(rcynic_dir):
+ for f in files:
+ if f.endswith(".roa"):
+ path = os.path.join(root, f)
+ uri = "rsync://" + path[len(rcynic_dir):].lstrip("/")
+ roa = rpki.x509.ROA(DER_file = path)
+ roa.extract()
+ assert roa.get_POW().getVersion() == 0, "ROA version is {:d}, expected 0".format(roa.get_POW().getVersion())
+ asnum = roa.get_POW().getASID()
+ notBefore = roa.get_POW().certs()[0].getNotBefore().strftime("%Y%m%d")
+ v4, v6 = roa.get_POW().getPrefixes()
+ if v4 is not None:
+ for prefix, prefixlen, max_prefixlen in v4:
+ self.append(route("route:", uri, asnum, notBefore, prefix, prefixlen, max_prefixlen))
+ if v6 is not None:
+ for prefix, prefixlen, max_prefixlen in v6:
+ self.append(route("route6:", uri, asnum, notBefore, prefix, prefixlen, max_prefixlen))
+ self.sort()
+ for i in xrange(len(self) - 2, -1, -1):
+ if self[i] == self[i + 1]:
+ del self[i + 1]
def email_header(f):
- if args.email:
- now = time()
- f.write(dedent('''\
- From {from_} {ctime}
- Date: {date}
- From: {from_}
- Subject: Fake email header to make irr_rpsl_submit happy
- Message-Id: <{pid}.{seconds}@{hostname}>
- ''').format(from_ = args.from_,
- ctime = asctime(gmtime(now)),
- date = strftime("%d %b %Y %T %z", gmtime(now)),
- pid = os.getpid(),
- seconds = now,
- hostname = gethostname()))
+ if args.email:
+ now = time()
+ f.write(dedent('''\
+ From {from_} {ctime}
+ Date: {date}
+ From: {from_}
+ Subject: Fake email header to make irr_rpsl_submit happy
+ Message-Id: <{pid}.{seconds}@{hostname}>
+ ''').format(from_ = args.from_,
+ ctime = asctime(gmtime(now)),
+ date = strftime("%d %b %Y %T %z", gmtime(now)),
+ pid = os.getpid(),
+ seconds = now,
+ hostname = gethostname()))
def main():
- global args
- whoami = "{}@{}".format(os.getlogin(), gethostname())
-
- parser = argparse.ArgumentParser(description = __doc__)
- parser.add_argument("-c", "--changed_by", default = whoami, help = "override \"changed:\" value")
- parser.add_argument("-f", "--from", dest="from_", default = whoami, help = "override \"from:\" header when using --email")
- parser.add_argument("-m", "--mnt_by", default = "MAINT-RPKI", help = "override \"mnt-by:\" value")
- parser.add_argument("-n", "--notify", default = whoami, help = "override \"notify:\" value")
- parser.add_argument("-p", "--password", help = "specify \"override:\" password")
- parser.add_argument("-s", "--source", default = "RPKI", help = "override \"source:\" value")
- group = parser.add_mutually_exclusive_group()
- group.add_argument("-e", "--email", action = "store_true", help = "generate fake RFC 822 header suitable for piping to irr_rpsl_submit")
- group.add_argument("-d", "--output-directory", help = "write route and route6 objects to directory OUTPUT, one object per file")
- parser.add_argument("authenticated_directory", help = "directory tree containing authenticated rcynic output")
- args = parser.parse_args()
-
- if not os.path.isdir(args.authenticated_directory):
- sys.exit('"{}" is not a directory'.format(args.authenticated_directory))
-
- routes = route_list(args.authenticated_directory)
-
- if args.output_directory:
- if not os.path.isdir(args.output_directory):
- os.makedirs(args.output_directory)
- for r in routes:
- r.write(args.output_directory)
- else:
- email_header(sys.stdout)
- for r in routes:
- sys.stdout.write(str(r))
+ global args
+ whoami = "{}@{}".format(os.getlogin(), gethostname())
+
+ parser = argparse.ArgumentParser(description = __doc__)
+ parser.add_argument("-c", "--changed_by", default = whoami, help = "override \"changed:\" value")
+ parser.add_argument("-f", "--from", dest="from_", default = whoami, help = "override \"from:\" header when using --email")
+ parser.add_argument("-m", "--mnt_by", default = "MAINT-RPKI", help = "override \"mnt-by:\" value")
+ parser.add_argument("-n", "--notify", default = whoami, help = "override \"notify:\" value")
+ parser.add_argument("-p", "--password", help = "specify \"override:\" password")
+ parser.add_argument("-s", "--source", default = "RPKI", help = "override \"source:\" value")
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument("-e", "--email", action = "store_true", help = "generate fake RFC 822 header suitable for piping to irr_rpsl_submit")
+ group.add_argument("-d", "--output-directory", help = "write route and route6 objects to directory OUTPUT, one object per file")
+ parser.add_argument("authenticated_directory", help = "directory tree containing authenticated rcynic output")
+ args = parser.parse_args()
+
+ if not os.path.isdir(args.authenticated_directory):
+ sys.exit('"{}" is not a directory'.format(args.authenticated_directory))
+
+ routes = route_list(args.authenticated_directory)
+
+ if args.output_directory:
+ if not os.path.isdir(args.output_directory):
+ os.makedirs(args.output_directory)
+ for r in routes:
+ r.write(args.output_directory)
+ else:
+ email_header(sys.stdout)
+ for r in routes:
+ sys.stdout.write(str(r))
if __name__ == "__main__":
- main()
+ main()