From b46deb1417dc3596e9ac9fe2fe8cc0b7f42457e7 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Mon, 26 Oct 2015 06:29:00 +0000 Subject: "Any programmer who fails to comply with the standard naming, formatting, or commenting conventions should be shot. If it so happens that it is inconvenient to shoot him, then he is to be politely requested to recode his program in adherence to the above standard." -- Michael Spier, Digital Equipment Corporation svn path=/branches/tk705/; revision=6152 --- potpourri/format-application-x-rpki.py | 126 ++++++++++++++++----------------- 1 file changed, 63 insertions(+), 63 deletions(-) (limited to 'potpourri/format-application-x-rpki.py') diff --git a/potpourri/format-application-x-rpki.py b/potpourri/format-application-x-rpki.py index 184103f9..44428131 100644 --- a/potpourri/format-application-x-rpki.py +++ b/potpourri/format-application-x-rpki.py @@ -1,12 +1,12 @@ # $Id$ -# +# # Copyright (C) 2014 Dragon Research Labs ("DRL") # Portions copyright (C) 2010--2012 Internet Systems Consortium ("ISC") -# +# # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. -# +# # THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR @@ -53,80 +53,80 @@ parser.add_argument("-u", "--unseen", action = "store_true", args = parser.parse_args() def pprint_cert(b64): - return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint() - + return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint() + def up_down(): - msg["X-RPKI-Up-Down-Type"] = xml.get("type") - msg["X-RPKI-Up-Down-Sender"] = xml.get("sender") - msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient") - msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient")) - for x in xml: - if x.tag.endswith("class"): - for y in x: - if y.tag.endswith("certificate") or y.tag.endswith("issuer"): - msg.attach(email.mime.text.MIMEText(pprint_cert(y.text))) + msg["X-RPKI-Up-Down-Type"] = xml.get("type") + msg["X-RPKI-Up-Down-Sender"] = xml.get("sender") + msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient") + msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient")) + for x in xml: + if x.tag.endswith("class"): + for y in x: + if y.tag.endswith("certificate") or y.tag.endswith("issuer"): + msg.attach(email.mime.text.MIMEText(pprint_cert(y.text))) def left_right(): - msg["X-RPKI-Left-Right-Type"] = xml.get("type") - msg["Subject"] = "Left-right %s" % xml.get("type") + msg["X-RPKI-Left-Right-Type"] = xml.get("type") + msg["Subject"] = "Left-right %s" % xml.get("type") def publication(): - msg["X-RPKI-Left-Right-Type"] = xml.get("type") - msg["Subject"] = "Publication %s" % xml.get("type") + msg["X-RPKI-Left-Right-Type"] = xml.get("type") + msg["Subject"] = "Publication %s" % xml.get("type") dispatch = { "{http://www.apnic.net/specs/rescerts/up-down/}message" : up_down, "{http://www.hactrn.net/uris/rpki/left-right-spec/}msg" : left_right, "{http://www.hactrn.net/uris/rpki/publication-spec/}msg" : publication } def fix_headers(): - if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg: - msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"] - msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"] - else: - words = srcmsg["Subject"].split() - msg["X-RPKI-PID"] = words[1] - msg["X-RPKI-Object"] = " ".join(words[4:]) - + if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg: + msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"] + msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"] + else: + words = srcmsg["Subject"].split() + msg["X-RPKI-PID"] = words[1] + msg["X-RPKI-Object"] = " ".join(words[4:]) + destination = None source = None try: - destination = mailbox.MH(args.output, factory = None, create = True) - source = mailbox.Maildir(args.input, factory = None) + destination = mailbox.MH(args.output, factory = None, create = True) + source = mailbox.Maildir(args.input, factory = None) - for srckey, srcmsg in source.iteritems(): - if args.unseen and "S" in srcmsg.get_flags(): - continue - assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki" - payload = srcmsg.get_payload(decode = True) - cms = rpki.POW.CMS.derRead(payload) - txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY) - xml = lxml.etree.fromstring(txt) - tag = xml.tag - if args.tag and tag != args.tag: - continue - msg = email.mime.multipart.MIMEMultipart("related") - msg["X-RPKI-Tag"] = tag - for i in ("Date", "Message-ID", "X-RPKI-Timestamp"): - msg[i] = srcmsg[i] - fix_headers() - if tag in dispatch: - dispatch[tag]() - if "Subject" not in msg: - msg["Subject"] = srcmsg["Subject"] - msg.attach(email.mime.text.MIMEText(txt)) - msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki")) - msg.epilogue = "\n" # Force trailing newline - key = destination.add(msg) - print "Added", key - if args.kill: - del source[srckey] - elif args.mark: - srcmsg.set_subdir("cur") - srcmsg.add_flag("S") - source[srckey] = srcmsg + for srckey, srcmsg in source.iteritems(): + if args.unseen and "S" in srcmsg.get_flags(): + continue + assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki" + payload = srcmsg.get_payload(decode = True) + cms = rpki.POW.CMS.derRead(payload) + txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY) + xml = lxml.etree.fromstring(txt) + tag = xml.tag + if args.tag and tag != args.tag: + continue + msg = email.mime.multipart.MIMEMultipart("related") + msg["X-RPKI-Tag"] = tag + for i in ("Date", "Message-ID", "X-RPKI-Timestamp"): + msg[i] = srcmsg[i] + fix_headers() + if tag in dispatch: + dispatch[tag]() + if "Subject" not in msg: + msg["Subject"] = srcmsg["Subject"] + msg.attach(email.mime.text.MIMEText(txt)) + msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki")) + msg.epilogue = "\n" # Force trailing newline + key = destination.add(msg) + print "Added", key + if args.kill: + del source[srckey] + elif args.mark: + srcmsg.set_subdir("cur") + srcmsg.add_flag("S") + source[srckey] = srcmsg finally: - if destination: - destination.close() - if source: - source.close() + if destination: + destination.close() + if source: + source.close() -- cgit v1.2.3