diff options
Diffstat (limited to 'potpourri/format-application-x-rpki.py')
-rw-r--r-- | potpourri/format-application-x-rpki.py | 126 |
1 files changed, 63 insertions, 63 deletions
diff --git a/potpourri/format-application-x-rpki.py b/potpourri/format-application-x-rpki.py index 184103f9..44428131 100644 --- a/potpourri/format-application-x-rpki.py +++ b/potpourri/format-application-x-rpki.py @@ -1,12 +1,12 @@ # $Id$ -# +# # Copyright (C) 2014 Dragon Research Labs ("DRL") # Portions copyright (C) 2010--2012 Internet Systems Consortium ("ISC") -# +# # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. -# +# # THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR @@ -53,80 +53,80 @@ parser.add_argument("-u", "--unseen", action = "store_true", args = parser.parse_args() def pprint_cert(b64): - return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint() - + return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint() + def up_down(): - msg["X-RPKI-Up-Down-Type"] = xml.get("type") - msg["X-RPKI-Up-Down-Sender"] = xml.get("sender") - msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient") - msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient")) - for x in xml: - if x.tag.endswith("class"): - for y in x: - if y.tag.endswith("certificate") or y.tag.endswith("issuer"): - msg.attach(email.mime.text.MIMEText(pprint_cert(y.text))) + msg["X-RPKI-Up-Down-Type"] = xml.get("type") + msg["X-RPKI-Up-Down-Sender"] = xml.get("sender") + msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient") + msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient")) + for x in xml: + if x.tag.endswith("class"): + for y in x: + if y.tag.endswith("certificate") or y.tag.endswith("issuer"): + msg.attach(email.mime.text.MIMEText(pprint_cert(y.text))) def left_right(): - msg["X-RPKI-Left-Right-Type"] = xml.get("type") - msg["Subject"] = "Left-right %s" % xml.get("type") + msg["X-RPKI-Left-Right-Type"] = xml.get("type") + msg["Subject"] = "Left-right %s" % xml.get("type") def publication(): - msg["X-RPKI-Left-Right-Type"] = xml.get("type") - msg["Subject"] = "Publication %s" % xml.get("type") + msg["X-RPKI-Left-Right-Type"] = xml.get("type") + msg["Subject"] = "Publication %s" % xml.get("type") dispatch = { "{http://www.apnic.net/specs/rescerts/up-down/}message" : up_down, "{http://www.hactrn.net/uris/rpki/left-right-spec/}msg" : left_right, "{http://www.hactrn.net/uris/rpki/publication-spec/}msg" : publication } def fix_headers(): - if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg: - msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"] - msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"] - else: - words = srcmsg["Subject"].split() - msg["X-RPKI-PID"] = words[1] - msg["X-RPKI-Object"] = " ".join(words[4:]) - + if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg: + msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"] + msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"] + else: + words = srcmsg["Subject"].split() + msg["X-RPKI-PID"] = words[1] + msg["X-RPKI-Object"] = " ".join(words[4:]) + destination = None source = None try: - destination = mailbox.MH(args.output, factory = None, create = True) - source = mailbox.Maildir(args.input, factory = None) + destination = mailbox.MH(args.output, factory = None, create = True) + source = mailbox.Maildir(args.input, factory = None) - for srckey, srcmsg in source.iteritems(): - if args.unseen and "S" in srcmsg.get_flags(): - continue - assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki" - payload = srcmsg.get_payload(decode = True) - cms = rpki.POW.CMS.derRead(payload) - txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY) - xml = lxml.etree.fromstring(txt) - tag = xml.tag - if args.tag and tag != args.tag: - continue - msg = email.mime.multipart.MIMEMultipart("related") - msg["X-RPKI-Tag"] = tag - for i in ("Date", "Message-ID", "X-RPKI-Timestamp"): - msg[i] = srcmsg[i] - fix_headers() - if tag in dispatch: - dispatch[tag]() - if "Subject" not in msg: - msg["Subject"] = srcmsg["Subject"] - msg.attach(email.mime.text.MIMEText(txt)) - msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki")) - msg.epilogue = "\n" # Force trailing newline - key = destination.add(msg) - print "Added", key - if args.kill: - del source[srckey] - elif args.mark: - srcmsg.set_subdir("cur") - srcmsg.add_flag("S") - source[srckey] = srcmsg + for srckey, srcmsg in source.iteritems(): + if args.unseen and "S" in srcmsg.get_flags(): + continue + assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki" + payload = srcmsg.get_payload(decode = True) + cms = rpki.POW.CMS.derRead(payload) + txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY) + xml = lxml.etree.fromstring(txt) + tag = xml.tag + if args.tag and tag != args.tag: + continue + msg = email.mime.multipart.MIMEMultipart("related") + msg["X-RPKI-Tag"] = tag + for i in ("Date", "Message-ID", "X-RPKI-Timestamp"): + msg[i] = srcmsg[i] + fix_headers() + if tag in dispatch: + dispatch[tag]() + if "Subject" not in msg: + msg["Subject"] = srcmsg["Subject"] + msg.attach(email.mime.text.MIMEText(txt)) + msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki")) + msg.epilogue = "\n" # Force trailing newline + key = destination.add(msg) + print "Added", key + if args.kill: + del source[srckey] + elif args.mark: + srcmsg.set_subdir("cur") + srcmsg.add_flag("S") + source[srckey] = srcmsg finally: - if destination: - destination.close() - if source: - source.close() + if destination: + destination.close() + if source: + source.close() |