diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
commit | b46deb1417dc3596e9ac9fe2fe8cc0b7f42457e7 (patch) | |
tree | ca0dc0276d1adc168bc3337ce0564c4ec4957c1b /potpourri/format-application-x-rpki.py | |
parent | 397beaf6d9900dc3b3cb612c89ebf1d57b1d16f6 (diff) |
"Any programmer who fails to comply with the standard naming, formatting,
or commenting conventions should be shot. If it so happens that it is
inconvenient to shoot him, then he is to be politely requested to recode
his program in adherence to the above standard."
-- Michael Spier, Digital Equipment Corporation
svn path=/branches/tk705/; revision=6152
Diffstat (limited to 'potpourri/format-application-x-rpki.py')
-rw-r--r-- | potpourri/format-application-x-rpki.py | 126 |
1 files changed, 63 insertions, 63 deletions
diff --git a/potpourri/format-application-x-rpki.py b/potpourri/format-application-x-rpki.py index 184103f9..44428131 100644 --- a/potpourri/format-application-x-rpki.py +++ b/potpourri/format-application-x-rpki.py @@ -1,12 +1,12 @@ # $Id$ -# +# # Copyright (C) 2014 Dragon Research Labs ("DRL") # Portions copyright (C) 2010--2012 Internet Systems Consortium ("ISC") -# +# # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. -# +# # THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR @@ -53,80 +53,80 @@ parser.add_argument("-u", "--unseen", action = "store_true", args = parser.parse_args() def pprint_cert(b64): - return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint() - + return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint() + def up_down(): - msg["X-RPKI-Up-Down-Type"] = xml.get("type") - msg["X-RPKI-Up-Down-Sender"] = xml.get("sender") - msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient") - msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient")) - for x in xml: - if x.tag.endswith("class"): - for y in x: - if y.tag.endswith("certificate") or y.tag.endswith("issuer"): - msg.attach(email.mime.text.MIMEText(pprint_cert(y.text))) + msg["X-RPKI-Up-Down-Type"] = xml.get("type") + msg["X-RPKI-Up-Down-Sender"] = xml.get("sender") + msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient") + msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient")) + for x in xml: + if x.tag.endswith("class"): + for y in x: + if y.tag.endswith("certificate") or y.tag.endswith("issuer"): + msg.attach(email.mime.text.MIMEText(pprint_cert(y.text))) def left_right(): - msg["X-RPKI-Left-Right-Type"] = xml.get("type") - msg["Subject"] = "Left-right %s" % xml.get("type") + msg["X-RPKI-Left-Right-Type"] = xml.get("type") + msg["Subject"] = "Left-right %s" % xml.get("type") def publication(): - msg["X-RPKI-Left-Right-Type"] = xml.get("type") - msg["Subject"] = "Publication %s" % xml.get("type") + msg["X-RPKI-Left-Right-Type"] = xml.get("type") + msg["Subject"] = "Publication %s" % xml.get("type") dispatch = { "{http://www.apnic.net/specs/rescerts/up-down/}message" : up_down, "{http://www.hactrn.net/uris/rpki/left-right-spec/}msg" : left_right, "{http://www.hactrn.net/uris/rpki/publication-spec/}msg" : publication } def fix_headers(): - if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg: - msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"] - msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"] - else: - words = srcmsg["Subject"].split() - msg["X-RPKI-PID"] = words[1] - msg["X-RPKI-Object"] = " ".join(words[4:]) - + if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg: + msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"] + msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"] + else: + words = srcmsg["Subject"].split() + msg["X-RPKI-PID"] = words[1] + msg["X-RPKI-Object"] = " ".join(words[4:]) + destination = None source = None try: - destination = mailbox.MH(args.output, factory = None, create = True) - source = mailbox.Maildir(args.input, factory = None) + destination = mailbox.MH(args.output, factory = None, create = True) + source = mailbox.Maildir(args.input, factory = None) - for srckey, srcmsg in source.iteritems(): - if args.unseen and "S" in srcmsg.get_flags(): - continue - assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki" - payload = srcmsg.get_payload(decode = True) - cms = rpki.POW.CMS.derRead(payload) - txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY) - xml = lxml.etree.fromstring(txt) - tag = xml.tag - if args.tag and tag != args.tag: - continue - msg = email.mime.multipart.MIMEMultipart("related") - msg["X-RPKI-Tag"] = tag - for i in ("Date", "Message-ID", "X-RPKI-Timestamp"): - msg[i] = srcmsg[i] - fix_headers() - if tag in dispatch: - dispatch[tag]() - if "Subject" not in msg: - msg["Subject"] = srcmsg["Subject"] - msg.attach(email.mime.text.MIMEText(txt)) - msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki")) - msg.epilogue = "\n" # Force trailing newline - key = destination.add(msg) - print "Added", key - if args.kill: - del source[srckey] - elif args.mark: - srcmsg.set_subdir("cur") - srcmsg.add_flag("S") - source[srckey] = srcmsg + for srckey, srcmsg in source.iteritems(): + if args.unseen and "S" in srcmsg.get_flags(): + continue + assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki" + payload = srcmsg.get_payload(decode = True) + cms = rpki.POW.CMS.derRead(payload) + txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY) + xml = lxml.etree.fromstring(txt) + tag = xml.tag + if args.tag and tag != args.tag: + continue + msg = email.mime.multipart.MIMEMultipart("related") + msg["X-RPKI-Tag"] = tag + for i in ("Date", "Message-ID", "X-RPKI-Timestamp"): + msg[i] = srcmsg[i] + fix_headers() + if tag in dispatch: + dispatch[tag]() + if "Subject" not in msg: + msg["Subject"] = srcmsg["Subject"] + msg.attach(email.mime.text.MIMEText(txt)) + msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki")) + msg.epilogue = "\n" # Force trailing newline + key = destination.add(msg) + print "Added", key + if args.kill: + del source[srckey] + elif args.mark: + srcmsg.set_subdir("cur") + srcmsg.add_flag("S") + source[srckey] = srcmsg finally: - if destination: - destination.close() - if source: - source.close() + if destination: + destination.close() + if source: + source.close() |