aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2010-01-05 01:21:51 +0000
committerRob Austein <sra@hactrn.net>2010-01-05 01:21:51 +0000
commit97ed2aa372e2cb7985affedd45357a53e30879b9 (patch)
tree2b448d7f8d4acab082269d9ba3cd13291b0d1e45 /scripts
parent54f4de8a9cc2da82eda5a2f675cbe498fd873dde (diff)
Focus on up-down protocol debugging for now.
svn path=/scripts/format-application-x-rpki.py; revision=2927
Diffstat (limited to 'scripts')
-rw-r--r--scripts/format-application-x-rpki.py95
1 files changed, 58 insertions, 37 deletions
diff --git a/scripts/format-application-x-rpki.py b/scripts/format-application-x-rpki.py
index 25929149..e4b4e566 100644
--- a/scripts/format-application-x-rpki.py
+++ b/scripts/format-application-x-rpki.py
@@ -22,20 +22,21 @@ PERFORMANCE OF THIS SOFTWARE.
"""
import email.mime, email.mime.application, email.mime.text, email.mime.multipart, email.utils, email.encoders
-import mailbox, POW, lxml.etree, getopt, sys
+import mailbox, POW, lxml.etree, getopt, sys, base64
-multipart = True
source_name = None
destination_name = None
mark_seen = False
kill_seen = False
+unseen_only = False
+target_tag = "{http://www.apnic.net/specs/rescerts/up-down/}message"
def usage(ok):
- print "Usage: %s [--mark] --input maildir --output mhfolder" % sys.argv[0]
+ print "Usage: %s [--mark] [--kill] [--tag tag]] [--unseen] --input maildir --output mhfolder" % sys.argv[0]
print __doc__
sys.exit(0 if ok else 1)
-opts, argv = getopt.getopt(sys.argv[1:], "hi:kmo:?", ["help", "input=", "kill", "mark", "output="])
+opts, argv = getopt.getopt(sys.argv[1:], "hi:kmo:t:u?", ["help", "input=", "kill", "mark", "output=", "tag=", "unseen"])
for o, a in opts:
if o in ("-h", "--help", "-?"):
usage(ok = True)
@@ -43,16 +44,30 @@ for o, a in opts:
source_name = a
elif o in ("-m", "--mark"):
mark_seen = True
+ elif o in ("-k", "--kill"):
+ kill_seen = True
elif o in ("-o", "--output"):
destination_name = a
+ elif o in ("-t", "--tag"):
+ target_tag = a
+ elif o in ("-u", "--unseen"):
+ unseen_only = True
if argv or source_name is None or destination_name is None:
usage(ok = False)
+def pprint_cert(b64):
+ return POW.derRead(POW.X509_CERTIFICATE, base64.b64decode(b64)).pprint()
+
def up_down():
msg["X-RPKI-Up-Down-Type"] = xml.get("type")
msg["X-RPKI-Up-Down-Sender"] = xml.get("sender")
msg["X-RPKI-Up-Down-Recipient"] = xml.get("recipient")
msg["Subject"] = "Up-down %s %s => %s" % (xml.get("type"), xml.get("sender"), xml.get("recipient"))
+ for x in xml:
+ if x.tag.endswith("class"):
+ for y in x:
+ if y.tag.endswith("certificate") or y.tag.endswith("issuer"):
+ msg.attach(email.mime.text.MIMEText(pprint_cert(y.text)))
def left_right():
msg["X-RPKI-Left-Right-Type"] = xml.get("type")
@@ -66,6 +81,15 @@ dispatch = { "{http://www.apnic.net/specs/rescerts/up-down/}message" : up_down,
"{http://www.hactrn.net/uris/rpki/left-right-spec/}msg" : left_right,
"{http://www.hactrn.net/uris/rpki/publication-spec/}msg" : publication }
+def fix_headers():
+ if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg:
+ msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"]
+ msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"]
+ else:
+ words = srcmsg["Subject"].split()
+ msg["X-RPKI-PID"] = words[1]
+ msg["X-RPKI-Object"] = " ".join(words[4:])
+
destination = None
source = None
try:
@@ -73,39 +97,36 @@ try:
source = mailbox.Maildir(source_name, factory = None)
for srckey, srcmsg in source.iteritems():
- if "S" not in srcmsg.get_flags():
- assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki"
- payload = srcmsg.get_payload(decode = True)
- cms = POW.derRead(POW.CMS_MESSAGE, payload)
- txt = cms.verify(POW.X509Store(), None, POW.CMS_NOCRL | POW.CMS_NO_SIGNER_CERT_VERIFY | POW.CMS_NO_ATTR_VERIFY | POW.CMS_NO_CONTENT_VERIFY)
- xml = lxml.etree.fromstring(txt)
- tag = xml.tag
- msg = email.mime.text.MIMEText(txt)
- if multipart:
- msg = email.mime.multipart.MIMEMultipart("related", None, (msg, email.mime.application.MIMEApplication(payload, "x-rpki")))
- msg["X-RPKI-Tag"] = tag
- for i in ("Date", "Message-ID"):
- msg[i] = srcmsg[i]
- if "X-RPKI-PID" in srcmsg or "X-RPKI-Object" in srcmsg:
- msg["X-RPKI-PID"] = srcmsg["X-RPKI-PID"]
- msg["X-RPKI-Object"] = srcmsg["X-RPKI-Object"]
- else:
- words = srcmsg["Subject"].split()
- msg["X-RPKI-PID"] = words[1]
- msg["X-RPKI-Object"] = " ".join(words[4:])
- if tag in dispatch:
- dispatch[tag]()
- if "Subject" not in msg:
- msg["Subject"] = srcmsg["Subject"]
- msg.epilogue = "\n" # Force trailing newline
- key = destination.add(msg)
- print "Added", key
- if kill_seen:
- srcmsg.discard()
- elif mark_seen:
- srcmsg.set_subdir("cur")
- srcmsg.add_flag("S")
- source[srckey] = srcmsg
+ if unseen_only and "S" in srcmsg.get_flags():
+ continue
+ assert not srcmsg.is_multipart() and srcmsg.get_content_type() == "application/x-rpki"
+ payload = srcmsg.get_payload(decode = True)
+ cms = POW.derRead(POW.CMS_MESSAGE, payload)
+ txt = cms.verify(POW.X509Store(), None, POW.CMS_NOCRL | POW.CMS_NO_SIGNER_CERT_VERIFY | POW.CMS_NO_ATTR_VERIFY | POW.CMS_NO_CONTENT_VERIFY)
+ xml = lxml.etree.fromstring(txt)
+ tag = xml.tag
+ if target_tag and tag != target_tag:
+ continue
+ msg = email.mime.multipart.MIMEMultipart("related")
+ msg["X-RPKI-Tag"] = tag
+ for i in ("Date", "Message-ID"):
+ msg[i] = srcmsg[i]
+ fix_headers()
+ if tag in dispatch:
+ dispatch[tag]()
+ if "Subject" not in msg:
+ msg["Subject"] = srcmsg["Subject"]
+ msg.attach(email.mime.text.MIMEText(txt))
+ msg.attach(email.mime.application.MIMEApplication(payload, "x-rpki"))
+ msg.epilogue = "\n" # Force trailing newline
+ key = destination.add(msg)
+ print "Added", key
+ if kill_seen:
+ del source[srckey]
+ elif mark_seen:
+ srcmsg.set_subdir("cur")
+ srcmsg.add_flag("S")
+ source[srckey] = srcmsg
finally:
if destination: