diff options
author | Rob Austein <sra@hactrn.net> | 2008-07-15 17:38:45 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2008-07-15 17:38:45 +0000 |
commit | 8f8a7ea81035f9230c2b27588cb1548279d938b8 (patch) | |
tree | e1dc812ab16793b7f5f36c4b4fb5f18197625de2 /rpkid/irbe_cli.py | |
parent | 9d7bc8e2696d4616b5c73fe6f88d2e0c5dc9e545 (diff) |
Rename a couple of scripts whose names confuse Doxygen. Clean some of
the old test code out of rpkid/ to make real code easier to find.
svn path=/rpkid/Doxyfile; revision=1995
Diffstat (limited to 'rpkid/irbe_cli.py')
-rwxr-xr-x | rpkid/irbe_cli.py | 301 |
1 files changed, 301 insertions, 0 deletions
diff --git a/rpkid/irbe_cli.py b/rpkid/irbe_cli.py new file mode 100755 index 00000000..dda476c5 --- /dev/null +++ b/rpkid/irbe_cli.py @@ -0,0 +1,301 @@ +""" +Command line IR back-end control program for rpkid and pubd. + +$Id$ + +Copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") + +Permission to use, copy, modify, and distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH +REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, +INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +PERFORMANCE OF THIS SOFTWARE. +""" + +import getopt, sys, textwrap +import rpki.left_right, rpki.https, rpki.x509, rpki.config, rpki.log, rpki.publication + +pem_out = None + +class UsageWrapper(textwrap.TextWrapper): + """Call interface around Python textwrap.Textwrapper class.""" + + def __call__(self, *args): + """Format arguments, with TextWrapper indentation.""" + return self.fill(textwrap.dedent(" ".join(args))) + +usage_fill = UsageWrapper(subsequent_indent = " " * 4) + +class cmd_elt_mixin(object): + """Protocol mix-in for command line client element PDUs.""" + + ## @var excludes + # XML attributes and elements that should not be allowed as command + # line arguments. At the moment the only such is the + # bsc.pkcs10_request sub-element, but writing this generally is no + # harder than handling that one special case. + excludes = () + + @classmethod + def usage(cls): + """Generate usage message for this PDU.""" + args = " ".join("--" + x + "=" for x in cls.attributes + cls.elements if x not in cls.excludes) + opts = " ".join("--" + x for x in cls.booleans) + if args and opts: + return args + " " + opts + else: + return args or opts + + def client_getopt(self, argv): + """Parse options for this class.""" + opts, argv = getopt.getopt(argv, "", [x + "=" for x in self.attributes + self.elements if x not in self.excludes] + list(self.booleans)) + for o, a in opts: + o = o[2:] + handler = getattr(self, "client_query_" + o, None) + if handler is not None: + handler(a) + elif o in self.booleans: + setattr(self, o, True) + else: + assert o in self.attributes + setattr(self, o, a) + return argv + + def client_query_bpki_cert(self, arg): + """Special handler for --bpki_cert option.""" + self.bpki_cert = rpki.x509.X509(Auto_file = arg) + + def client_query_glue(self, arg): + """Special handler for --bpki_glue option.""" + self.bpki_glue = rpki.x509.X509(Auto_file = arg) + + def client_query_bpki_cms_cert(self, arg): + """Special handler for --bpki_cms_cert option.""" + self.bpki_cms_cert = rpki.x509.X509(Auto_file = arg) + + def client_query_cms_glue(self, arg): + """Special handler for --bpki_cms_glue option.""" + self.bpki_cms_glue = rpki.x509.X509(Auto_file = arg) + + def client_query_bpki_https_cert(self, arg): + """Special handler for --bpki_https_cert option.""" + self.bpki_https_cert = rpki.x509.X509(Auto_file = arg) + + def client_query_https_glue(self, arg): + """Special handler for --bpki_https_glue option.""" + self.bpki_https_glue = rpki.x509.X509(Auto_file = arg) + + def client_reply_decode(self): + pass + + def client_reply_show(self): + print self.element_name + for i in self.attributes + self.elements: + if getattr(self, i) is not None: + print " %s: %s" % (i, getattr(self, i)) + +class cmd_msg_mixin(object): + """Protocol mix-in for command line client message PDUs.""" + + @classmethod + def usage(cls): + """Generate usage message for this PDU.""" + for k,v in cls.pdus.items(): + print usage_fill(k, v.usage()) + +# left-right protcol + +class self_elt(cmd_elt_mixin, rpki.left_right.self_elt): + pass + +class bsc_elt(cmd_elt_mixin, rpki.left_right.bsc_elt): + + excludes = ("pkcs10_request",) + + def client_query_signing_cert(self, arg): + """--signing_cert option.""" + self.signing_cert = rpki.x509.X509(Auto_file = arg) + + def client_query_signing_cert_crl(self, arg): + """--signing_cert_crl option.""" + self.signing_cert_crl = rpki.x509.CRL(Auto_file = arg) + + def client_reply_decode(self): + global pem_out + if pem_out is not None and self.pkcs10_request is not None: + if isinstance(pem_out, str): + pem_out = open(pem_out, "w") + pem_out.write(self.pkcs10_request.get_PEM()) + +class parent_elt(cmd_elt_mixin, rpki.left_right.parent_elt): + pass + +class child_elt(cmd_elt_mixin, rpki.left_right.child_elt): + pass + +class repository_elt(cmd_elt_mixin, rpki.left_right.repository_elt): + pass + +class route_origin_elt(cmd_elt_mixin, rpki.left_right.route_origin_elt): + + def client_query_as_number(self, arg): + """Handle autonomous sequence numbers.""" + self.as_number = long(arg) + + def client_query_ipv4(self, arg): + """Handle IPv4 addresses.""" + self.ipv4 = resource_set.roa_prefix_set_ipv4(arg) + + def client_query_ipv6(self, arg): + """Handle IPv6 addresses.""" + self.ipv6 = resource_set.roa_prefix_set_ipv6(arg) + +class left_right_msg(cmd_msg_mixin, rpki.left_right.msg): + pdus = dict((x.element_name, x) + for x in (self_elt, bsc_elt, parent_elt, child_elt, repository_elt, route_origin_elt)) + +class left_right_sax_handler(rpki.left_right.sax_handler): + pdu = left_right_msg + +class left_right_cms_msg(rpki.left_right.cms_msg): + saxify = left_right_sax_handler.saxify + +# Publication protocol + +class config_elt(cmd_elt_mixin, rpki.publication.config_elt): + + def client_query_bpki_crl(self, arg): + """Special handler for --bpki_crl option.""" + self.bpki_crl = rpki.x509.CRL(Auto_file = arg) + +class client_elt(cmd_elt_mixin, rpki.publication.client_elt): + pass + +class certificate_elt(cmd_elt_mixin, rpki.publication.certificate_elt): + pass + +class crl_elt(cmd_elt_mixin, rpki.publication.crl_elt): + pass + +class manifest_elt(cmd_elt_mixin, rpki.publication.manifest_elt): + pass + +class roa_elt(cmd_elt_mixin, rpki.publication.roa_elt): + pass + +class publication_msg(cmd_msg_mixin, rpki.publication.msg): + pdus = dict((x.element_name, x) + for x in (config_elt, client_elt, certificate_elt, crl_elt, manifest_elt, roa_elt)) + +class publication_sax_handler(rpki.publication.sax_handler): + pdu = publication_msg + +class publication_cms_msg(rpki.publication.cms_msg): + saxify = publication_sax_handler.saxify + +# Usage + +top_opts = ["config=", "help", "pem_out=", "verbose"] + +def usage(code = 1): + print __doc__.strip() + print + print "Usage:" + print + print "# Top-level options:" + print usage_fill(*["--" + x for x in top_opts]) + print + print "# left-right protocol:" + left_right_msg.usage() + print + print "# publication protocol:" + publication_msg.usage() + sys.exit(code) + +# This should probably be a method of an as-yet-unwritten server class + +def call_daemon(cms_class, client_key, client_cert, server_ta, url, q_msg): + q_cms, q_xml = cms_class.wrap(q_msg, client_key, client_cert, pretty_print = True) + if verbose: + print q_xml + der = rpki.https.client(client_key = client_key, + client_cert = client_cert, + server_ta = server_ta, + url = url, + msg = q_cms) + r_msg, r_xml = cms_class.unwrap(der, server_ta, pretty_print = True) + print r_xml + for r_pdu in r_msg: + r_pdu.client_reply_decode() + +# Main program + +rpki.log.init("irbe_cli") + +argv = sys.argv[1:] + +if not argv: + usage(0) + +cfg_file = "irbe.conf" +verbose = False + +opts, argv = getopt.getopt(argv, "c:hpv?", top_opts) +for o, a in opts: + if o in ("-?", "-h", "--help"): + usage(0) + elif o in ("-c", "--config"): + cfg_file = a + elif o in ("-p", "--pem_out"): + pem_out = a + elif o in ("-v", "--verbose"): + verbose = True + +if not argv: + usage(1) + +cfg = rpki.config.parser(cfg_file, "irbe_cli") + +q_msg_left_right = left_right_msg() +q_msg_left_right.type = "query" + +q_msg_publication = publication_msg() +q_msg_publication.type = "query" + +while argv: + if argv[0] in left_right_msg.pdus: + q_pdu = left_right_msg.pdus[argv[0]]() + q_msg = q_msg_left_right + elif argv[0] in publication_msg.pdus: + q_pdu = publication_msg.pdus[argv[0]]() + q_msg = q_msg_publication + else: + usage(1) + argv = q_pdu.client_getopt(argv[1:]) + q_msg.append(q_pdu) + +if q_msg_left_right: + call_daemon( + cms_class = left_right_cms_msg, + client_key = rpki.x509.RSA( Auto_file = cfg.get("rpkid-irbe-key")), + client_cert = rpki.x509.X509(Auto_file = cfg.get("rpkid-irbe-cert")), + server_ta = (rpki.x509.X509(Auto_file = cfg.get("rpkid-bpki-ta")), + rpki.x509.X509(Auto_file = cfg.get("rpkid-cert"))), + url = cfg.get("rpkid-url"), + q_msg = q_msg_left_right) + +if q_msg_publication: + call_daemon( + cms_class = publication_cms_msg, + client_key = rpki.x509.RSA( Auto_file = cfg.get("pubd-irbe-key")), + client_cert = rpki.x509.X509(Auto_file = cfg.get("pubd-irbe-cert")), + server_ta = (rpki.x509.X509(Auto_file = cfg.get("pubd-bpki-ta")), + rpki.x509.X509(Auto_file = cfg.get("pubd-cert"))), + url = cfg.get("pubd-url"), + q_msg = q_msg_publication) |