aboutsummaryrefslogtreecommitdiff
path: root/scripts/rpki/up_down.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rpki/up_down.py')
-rw-r--r--scripts/rpki/up_down.py29
1 files changed, 21 insertions, 8 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py
index d5d3d93e..5c2ad35c 100644
--- a/scripts/rpki/up_down.py
+++ b/scripts/rpki/up_down.py
@@ -110,7 +110,8 @@ class certificate_elt(base_elt):
def toXML(self):
"""Generate a <certificate/> element."""
- elt = self.make_elt("certificate", "cert_url", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
+ elt = self.make_elt("certificate", "cert_url",
+ "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
elt.text = self.cert.get_Base64()
return elt
@@ -147,7 +148,8 @@ class class_elt(base_elt):
def toXML(self):
"""Generate a <class/> element."""
- elt = self.make_elt("class", "class_name", "cert_url", "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head")
+ elt = self.make_elt("class", "class_name", "cert_url",
+ "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head")
elt.extend([i.toXML() for i in self.certs])
self.make_b64elt(elt, "issuer", self.issuer.get_DER())
return elt
@@ -163,7 +165,10 @@ class list_pdu(base_elt):
"""Serve one "list" PDU."""
r_msg.payload = list_response_pdu()
irdb_as, irdb_v4, irdb_v6 = rpki.left_right.irdb_query(gctx, child.self_id, child.child_id)
- for ca_id in rpki.sql.fetch_column(gctx, "SELECT ca_id FROM ca WHERE ca.parent_id = parent.parent_id AND parent.self_id = %s" % child.self_id):
+ for ca_id in rpki.sql.fetch_column(gctx, """
+ SELECT ca_id FROM ca
+ WHERE ca.parent_id = parent.parent_id AND parent.self_id = %s
+ """ % child.self_id):
ca_detail = rpki.sql.ca_detail_obj.sql_fetch_active(gctx, ca_id)
if not ca_detail:
continue
@@ -174,7 +179,9 @@ class list_pdu(base_elt):
rc.class_name = str(ca_id)
rc.cert_url = multi_uri(ca_detail.ca_cert_uri)
rc.resource_set_as, rc.resource_set_ipv4, rc.resource_set_ipv6 = rc_as, rc_v4, rc_v6
- for child_cert in rpki.sql.child_cert_obj.sql_fetch_where(gctx, "child_id = %s AND ca_detail_id = %s" % (child.child_id, ca_detail.ca_detail_id)):
+ for child_cert in rpki.sql.child_cert_obj.sql_fetch_where(gctx, """
+ child_id = %s AND ca_detail_id = %s
+ """ % (child.child_id, ca_detail.ca_detail_id)):
c = certificate_elt()
c.cert_url = multi_uri(ca.sia_uri + child_cert.cert.gSKI() + ".cer")
c.cert = child_cert.cert
@@ -231,7 +238,8 @@ class issue_pdu(base_elt):
def toXML(self):
"""Generate payload of "issue" PDU."""
- elt = self.make_elt("request", "class_name", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
+ elt = self.make_elt("request", "class_name", "req_resource_set_as",
+ "req_resource_set_ipv4", "req_resource_set_ipv6")
elt.text = self.pkcs10.get_Base64()
return [elt]
@@ -249,11 +257,14 @@ class issue_pdu(base_elt):
self.pkcs10.check_valid_rpki()
# Check current cert, if any
- rc_as, rc_v4, rc_v6 = ca_detail.latest_ca_cert.get_3779resources(rpki.left_right.irdb_query(gctx, child.self_id, child.child_id))
+ irdb_resources = rpki.left_right.irdb_query(gctx, child.self_id, child.child_id)
+ rc_as, rc_v4, rc_v6 = ca_detail.latest_ca_cert.get_3779resources(irdb_resources)
req_key = self.pkcs10.getPublicKey()
req_sia = self.pkcs10.get_SIA()
req_ski = self.pkcs10.get_SKI()
- child_cert = rpki.sql.child_cert_obj.sql_fetch_where(gctx, "child_id = %s AND ca_detail_id = %s AND ski = %s" % (child.child_id, ca_detail.ca_detail_id, req_ski))
+ child_cert = rpki.sql.child_cert_obj.sql_fetch_where(gctx, """
+ child_id = %s AND ca_detail_id = %s AND ski = %s
+ """ % (child.child_id, ca_detail.ca_detail_id, req_ski))
assert len(child_cert) < 2
child_cert = child_cert[0] if child_cert else None
@@ -339,7 +350,9 @@ class revoke_pdu(revoke_syntax):
ca_detail = rpki.sql.ca_detail_obj.sql_fetch_active(gctx, ca_id)
if ca is None or ca_detail is None:
raise rpki.exceptions.NotInDatabase
- for c in rpki.sql.child_cert_obj.sql_fetch_where(gctx, "child_id = %s AND ca_detail_id = %s AND ski = %s" % (child.child_id, ca_detail.ca_detail_id, self.get_SKI())):
+ for c in rpki.sql.child_cert_obj.sql_fetch_where(gctx, """
+ child_id = %s AND ca_detail_id = %s AND ski = %s
+ """ % (child.child_id, ca_detail.ca_detail_id, self.get_SKI())):
c.sql_delete()
r_msg.payload = revoke_response_pdu()
r_msg.payload.class_name = self.class_name