aboutsummaryrefslogtreecommitdiff
path: root/potpourri/format-application-x-rpki.py
diff options
context:
space:
mode:
Diffstat (limited to 'potpourri/format-application-x-rpki.py')
-rw-r--r--potpourri/format-application-x-rpki.py126
1 files changed, 63 insertions, 63 deletions
diff --git a/potpourri/format-application-x-rpki.py b/potpourri/format-application-x-rpki.py
index 184103f9..44428131 100644
--- a/potpourri/format-application-x-rpki.py
+++ b/potpourri/format-application-x-rpki.py
@@ -1,12 +1,12 @@
# $Id$
-#
+#
# Copyright (C) 2014 Dragon Research Labs ("DRL")
# Portions copyright (C) 2010--2012 Internet Systems Consortium ("ISC")
-#
+#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notices and this permission notice appear in all copies.
-#
+#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR
@@ -53,80 +53,80 @@ parser.add_argument("-u", "--unseen", action = "store_true",
args = parser.parse_args()
def pprint_cert(b64):
- return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint()
-
+ return rpki.POW.X509.derRead(base64.b64decode(b64)).pprint()
+
def up_down():
- msg["X-RPKI-Up-Down-Type"] = xml.get("type")
- msg["X-RPKI-Up-Down-Sender"] = xml.get("sender")
- msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient")
- msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient"))
- for x in xml:
- if x.tag.endswith("class"):
- for y in x:
- if y.tag.endswith("certificate") or y.tag.endswith("issuer"):
- msg.attach(email.mime.text.MIMEText(pprint_cert(y.text)))
+ msg["X-RPKI-Up-Down-Type"] = xml.get("type")
+ msg["X-RPKI-Up-Down-Sender"] = xml.get("sender")
+ msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient")
+ msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient"))
+ for x in xml:
+ if x.tag.endswith("class"):
+ for y in x:
+ if y.tag.endswith("certificate") or y.tag.endswith("issuer"):
+ msg.attach(email.mime.text.MIMEText(pprint_cert(y.text)))
def left_right():
- msg["X-RPKI-Left-Right-Type"] = xml.get("type")
- msg["Subject"] = "Left-right %s" % xml.get("type")
+ msg["X-RPKI-Left-Right-Type"] = xml.get("type")
+ msg["Subject"] = "Left-right %s" % xml.get("type")
def publication():
- msg["X-RPKI-Left-Right-Type"] = xml.get("type")
- msg["Subject"] = "Publication %s" % xml.get("type")
+ msg["X-RPKI-Left-Right-Type"] = xml.get("type")
+ msg["Subject"] = "Publication %s" % xml.get("type")
dispatch = { "{http://www.apnic.net/specs/rescerts/up-down/}message" : up_down,
"{http://www.hactrn.net/uris/rpki/left-right-spec/}msg" : left_right,
"{http://www.hactrn.net/uris/rpki/publication-spec/}msg" : publication }
def fix_headers():
- if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg:
- msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"]
- msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"]
- else:
- words = srcmsg["Subject"].split()
- msg["X-RPKI-PID"] = words[1]
- msg["X-RPKI-Object"] = " ".join(words[4:])
-
+ if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg:
+ msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"]
+ msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"]
+ else:
+ words = srcmsg["Subject"].split()
+ msg["X-RPKI-PID"] = words[1]
+ msg["X-RPKI-Object"] = " ".join(words[4:])
+
destination = None
source = None
try:
- destination = mailbox.MH(args.output, factory = None, create = True)
- source = mailbox.Maildir(args.input, factory = None)
+ destination = mailbox.MH(args.output, factory = None, create = True)
+ source = mailbox.Maildir(args.input, factory = None)
- for srckey, srcmsg in source.iteritems():
- if args.unseen and "S" in srcmsg.get_flags():
- continue
- assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki"
- payload = srcmsg.get_payload(decode = True)
- cms = rpki.POW.CMS.derRead(payload)
- txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY)
- xml = lxml.etree.fromstring(txt)
- tag = xml.tag
- if args.tag and tag != args.tag:
- continue
- msg = email.mime.multipart.MIMEMultipart("related")
- msg["X-RPKI-Tag"] = tag
- for i in ("Date", "Message-ID", "X-RPKI-Timestamp"):
- msg[i] = srcmsg[i]
- fix_headers()
- if tag in dispatch:
- dispatch[tag]()
- if "Subject" not in msg:
- msg["Subject"] = srcmsg["Subject"]
- msg.attach(email.mime.text.MIMEText(txt))
- msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki"))
- msg.epilogue = "\n" # Force trailing newline
- key = destination.add(msg)
- print "Added", key
- if args.kill:
- del source[srckey]
- elif args.mark:
- srcmsg.set_subdir("cur")
- srcmsg.add_flag("S")
- source[srckey] = srcmsg
+ for srckey, srcmsg in source.iteritems():
+ if args.unseen and "S" in srcmsg.get_flags():
+ continue
+ assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki"
+ payload = srcmsg.get_payload(decode = True)
+ cms = rpki.POW.CMS.derRead(payload)
+ txt = cms.verify(rpki.POW.X509Store(), None, rpki.POW.CMS_NOCRL | rpki.POW.CMS_NO_SIGNER_CERT_VERIFY | rpki.POW.CMS_NO_ATTR_VERIFY | rpki.POW.CMS_NO_CONTENT_VERIFY)
+ xml = lxml.etree.fromstring(txt)
+ tag = xml.tag
+ if args.tag and tag != args.tag:
+ continue
+ msg = email.mime.multipart.MIMEMultipart("related")
+ msg["X-RPKI-Tag"] = tag
+ for i in ("Date", "Message-ID", "X-RPKI-Timestamp"):
+ msg[i] = srcmsg[i]
+ fix_headers()
+ if tag in dispatch:
+ dispatch[tag]()
+ if "Subject" not in msg:
+ msg["Subject"] = srcmsg["Subject"]
+ msg.attach(email.mime.text.MIMEText(txt))
+ msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki"))
+ msg.epilogue = "\n" # Force trailing newline
+ key = destination.add(msg)
+ print "Added", key
+ if args.kill:
+ del source[srckey]
+ elif args.mark:
+ srcmsg.set_subdir("cur")
+ srcmsg.add_flag("S")
+ source[srckey] = srcmsg
finally:
- if destination:
- destination.close()
- if source:
- source.close()
+ if destination:
+ destination.close()
+ if source:
+ source.close()