aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r--rpki/rpkid.py263
1 files changed, 172 insertions, 91 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index 36ee2ea9..45044ab8 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -318,6 +318,7 @@ class main(object):
Record that we were still alive when we got here, by resetting
keepalive timer.
"""
+
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
@@ -325,6 +326,7 @@ class main(object):
"""
Add a task to the scheduler task queue, unless it's already queued.
"""
+
if task not in self.task_queue:
logger.debug("Adding %r to task queue", task)
self.task_queue.append(task)
@@ -339,6 +341,7 @@ class main(object):
queue (we don't want to run it directly, as that could eventually
blow out our call stack).
"""
+
try:
self.task_current = self.task_queue.pop(0)
except IndexError:
@@ -350,6 +353,7 @@ class main(object):
"""
Run first task on the task queue, unless one is running already.
"""
+
if self.task_current is None:
self.task_next()
@@ -446,6 +450,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch parent object to which this CA object links.
"""
+
return rpki.left_right.parent_elt.sql_fetch(self.gctx, self.parent_id)
@property
@@ -453,6 +458,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch all ca_detail objects that link to this CA object.
"""
+
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s", (self.ca_id,))
@property
@@ -460,6 +466,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch the pending ca_details for this CA, if any.
"""
+
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'pending'", (self.ca_id,))
@property
@@ -467,6 +474,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch the active ca_detail for this CA, if any.
"""
+
return ca_detail_obj.sql_fetch_where1(self.gctx, "ca_id = %s AND state = 'active'", (self.ca_id,))
@property
@@ -474,6 +482,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch deprecated ca_details for this CA, if any.
"""
+
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'deprecated'", (self.ca_id,))
@property
@@ -481,6 +490,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch active and deprecated ca_details for this CA, if any.
"""
+
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND (state = 'active' OR state = 'deprecated')", (self.ca_id,))
@property
@@ -488,6 +498,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Fetch revoked ca_details for this CA, if any.
"""
+
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'revoked'", (self.ca_id,))
@property
@@ -496,7 +507,7 @@ class ca_obj(rpki.sql.sql_persistent):
Fetch ca_details which are candidates for consideration when
processing an up-down issue_response PDU.
"""
- #return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,))
+
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state != 'revoked'", (self.ca_id,))
def construct_sia_uri(self, parent, rc):
@@ -542,7 +553,8 @@ class ca_obj(rpki.sql.sql_persistent):
if rc_cert is None:
- logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, maybe parent certificate went away?",
+ logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, "
+ "maybe parent certificate went away?",
ca_detail.public_key.gSKI(), rc.class_name, parent.self.self_handle, parent.parent_handle)
publisher = publication_queue()
ca_detail.delete(ca = ca_detail.ca, publisher = publisher)
@@ -677,6 +689,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Allocate a certificate serial number.
"""
+
self.last_issued_sn += 1
self.sql_mark_dirty()
return self.last_issued_sn
@@ -685,6 +698,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Allocate a manifest serial number.
"""
+
self.last_manifest_sn += 1
self.sql_mark_dirty()
return self.last_manifest_sn
@@ -693,6 +707,7 @@ class ca_obj(rpki.sql.sql_persistent):
"""
Allocate a CRL serial number.
"""
+
self.last_crl_sn += 1
self.sql_mark_dirty()
return self.last_crl_sn
@@ -783,6 +798,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Extra assertions for SQL decode of a ca_detail_obj.
"""
+
rpki.sql.sql_persistent.sql_decode(self, vals)
assert self.public_key is None or self.private_key_id is None or self.public_key.get_DER() == self.private_key_id.get_public_DER()
assert self.manifest_public_key is None or self.manifest_private_key_id is None or self.manifest_public_key.get_DER() == self.manifest_private_key_id.get_public_DER()
@@ -793,12 +809,14 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Fetch CA object to which this ca_detail links.
"""
+
return ca_obj.sql_fetch(self.gctx, self.ca_id)
def fetch_child_certs(self, child = None, ski = None, unique = False, unpublished = None):
"""
Fetch all child_cert objects that link to this ca_detail.
"""
+
return rpki.rpkid.child_cert_obj.fetch(self.gctx, child, self, ski, unique, unpublished)
@property
@@ -806,6 +824,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Fetch all child_cert objects that link to this ca_detail.
"""
+
return self.fetch_child_certs()
def unpublished_child_certs(self, when):
@@ -813,6 +832,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
Fetch all unpublished child_cert objects linked to this ca_detail
with attempted publication dates older than when.
"""
+
return self.fetch_child_certs(unpublished = when)
@property
@@ -820,6 +840,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Fetch all revoked_cert objects that link to this ca_detail.
"""
+
return revoked_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,))
@property
@@ -827,6 +848,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Fetch all ROA objects that link to this ca_detail.
"""
+
return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,))
def unpublished_roas(self, when):
@@ -834,6 +856,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
Fetch all unpublished ROA objects linked to this ca_detail with
attempted publication dates older than when.
"""
+
return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when))
@property
@@ -841,27 +864,39 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Fetch all Ghostbuster objects that link to this ca_detail.
"""
+
return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,))
+ def unpublished_ghostbusters(self, when):
+ """
+ Fetch all unpublished Ghostbusters objects linked to this
+ ca_detail with attempted publication dates older than when.
+ """
+
+ return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when))
+
@property
def ee_certificates(self):
"""
Fetch all EE certificate objects that link to this ca_detail.
"""
+
return rpki.rpkid.ee_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,))
- def unpublished_ghostbusters(self, when):
+ def unpublished_ee_certificates(self, when):
"""
- Fetch all unpublished Ghostbusters objects linked to this
+ Fetch all unpublished EE certificate objects linked to this
ca_detail with attempted publication dates older than when.
"""
- return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when))
+
+ return rpki.rpkid.ee_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when))
@property
def crl_uri(self):
"""
Return publication URI for this ca_detail's CRL.
"""
+
return self.ca.sia_uri + self.crl_uri_tail
@property
@@ -869,6 +904,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Return tail (filename portion) of publication URI for this ca_detail's CRL.
"""
+
return self.public_key.gSKI() + ".crl"
@property
@@ -876,12 +912,14 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Return publication URI for this ca_detail's manifest.
"""
+
return self.ca.sia_uri + self.public_key.gSKI() + ".mft"
def has_expired(self):
"""
Return whether this ca_detail's certificate has expired.
"""
+
return self.latest_ca_cert.getNotAfter() <= rpki.sundial.now()
def covers(self, target):
@@ -933,11 +971,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
repository = ca.parent.repository
handler = False if allow_failure else None
for child_cert in self.child_certs:
- publisher.withdraw(cls = rpki.publication.certificate_elt,
- uri = child_cert.uri,
- obj = child_cert.cert,
- repository = repository,
- handler = handler)
+ publisher.queue(uri = child_cert.uri,
+ old_obj = child_cert.cert,
+ repository = repository,
+ handler = handler)
child_cert.sql_mark_deleted()
for roa in self.roas:
roa.revoke(publisher = publisher, allow_failure = allow_failure, fast = True)
@@ -948,21 +985,19 @@ class ca_detail_obj(rpki.sql.sql_persistent):
except AttributeError:
latest_manifest = None
if latest_manifest is not None:
- publisher.withdraw(cls = rpki.publication.manifest_elt,
- uri = self.manifest_uri,
- obj = self.latest_manifest,
- repository = repository,
- handler = handler)
+ publisher.queue(uri = self.manifest_uri,
+ old_obj = self.latest_manifest,
+ repository = repository,
+ handler = handler)
try:
latest_crl = self.latest_crl
except AttributeError:
latest_crl = None
if latest_crl is not None:
- publisher.withdraw(cls = rpki.publication.crl_elt,
- uri = self.crl_uri,
- obj = self.latest_crl,
- repository = repository,
- handler = handler)
+ publisher.queue(uri = self.crl_uri,
+ old_obj = self.latest_crl,
+ repository = repository,
+ handler = handler)
self.gctx.sql.sweep()
for cert in self.revoked_certs: # + self.child_certs
logger.debug("Deleting %r", cert)
@@ -1172,6 +1207,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
notAfter = resources.valid_until)
if child_cert is None:
+ old_cert = None
child_cert = rpki.rpkid.child_cert_obj(
gctx = child.gctx,
child_id = child.child_id,
@@ -1179,6 +1215,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
cert = cert)
logger.debug("Created new child_cert %r", child_cert)
else:
+ old_cert = child_cert.cert
child_cert.cert = cert
del child_cert.ca_detail
child_cert.ca_detail_id = self.ca_detail_id
@@ -1187,10 +1224,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
child_cert.ski = cert.get_SKI()
child_cert.published = rpki.sundial.now()
child_cert.sql_store()
- publisher.publish(
- cls = rpki.publication.certificate_elt,
+ publisher.queue(
uri = child_cert.uri,
- obj = child_cert.cert,
+ old_obj = old_cert,
+ new_obj = child_cert.cert,
repository = ca.parent.repository,
handler = child_cert.published_callback)
self.generate_manifest(publisher = publisher)
@@ -1221,6 +1258,8 @@ class ca_detail_obj(rpki.sql.sql_persistent):
certlist.append((revoked_cert.serial, revoked_cert.revoked))
certlist.sort()
+ old_crl = self.latest_crl
+
self.latest_crl = rpki.x509.CRL.generate(
keypair = self.private_key_id,
issuer = self.latest_ca_cert,
@@ -1231,10 +1270,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
self.crl_published = rpki.sundial.now()
self.sql_mark_dirty()
- publisher.publish(
- cls = rpki.publication.crl_elt,
+ publisher.queue(
uri = self.crl_uri,
- obj = self.latest_crl,
+ old_obj = old_crl,
+ new_obj = self.latest_crl,
repository = parent.repository,
handler = self.crl_published_callback)
@@ -1242,6 +1281,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
Check result of CRL publication.
"""
+
pdu.raise_if_error()
self.crl_published = None
self.sql_mark_dirty()
@@ -1278,6 +1318,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
objs.extend((e.uri_tail, e.cert) for e in self.ee_certificates)
logger.debug("Building manifest object %s", uri)
+ old_manifest = self.latest_manifest
self.latest_manifest = rpki.x509.SignedManifest.build(
serial = ca.next_manifest_number(),
thisUpdate = now,
@@ -1290,16 +1331,17 @@ class ca_detail_obj(rpki.sql.sql_persistent):
self.manifest_published = rpki.sundial.now()
self.sql_mark_dirty()
- publisher.publish(cls = rpki.publication.manifest_elt,
- uri = uri,
- obj = self.latest_manifest,
- repository = parent.repository,
- handler = self.manifest_published_callback)
+ publisher.queue(uri = uri,
+ old_obj = old_manifest,
+ new_obj = self.latest_manifest,
+ repository = parent.repository,
+ handler = self.manifest_published_callback)
def manifest_published_callback(self, pdu):
"""
Check result of manifest publication.
"""
+
pdu.raise_if_error()
self.manifest_published = None
self.sql_mark_dirty()
@@ -1361,21 +1403,19 @@ class ca_detail_obj(rpki.sql.sql_persistent):
self.crl_published is not None and \
self.crl_published < stale:
logger.debug("Retrying publication for %s", self.crl_uri)
- publisher.publish(cls = rpki.publication.crl_elt,
- uri = self.crl_uri,
- obj = self.latest_crl,
- repository = repository,
- handler = self.crl_published_callback)
+ publisher.queue(uri = self.crl_uri,
+ new_obj = self.latest_crl,
+ repository = repository,
+ handler = self.crl_published_callback)
if self.latest_manifest is not None and \
self.manifest_published is not None and \
self.manifest_published < stale:
logger.debug("Retrying publication for %s", self.manifest_uri)
- publisher.publish(cls = rpki.publication.manifest_elt,
- uri = self.manifest_uri,
- obj = self.latest_manifest,
- repository = repository,
- handler = self.manifest_published_callback)
+ publisher.queue(uri = self.manifest_uri,
+ new_obj = self.latest_manifest,
+ repository = repository,
+ handler = self.manifest_published_callback)
if not check_all:
return
@@ -1385,31 +1425,37 @@ class ca_detail_obj(rpki.sql.sql_persistent):
for child_cert in self.unpublished_child_certs(stale):
logger.debug("Retrying publication for %s", child_cert)
- publisher.publish(
- cls = rpki.publication.certificate_elt,
+ publisher.queue(
uri = child_cert.uri,
- obj = child_cert.cert,
+ new_obj = child_cert.cert,
repository = repository,
handler = child_cert.published_callback)
for roa in self.unpublished_roas(stale):
logger.debug("Retrying publication for %s", roa)
- publisher.publish(
- cls = rpki.publication.roa_elt,
+ publisher.queue(
uri = roa.uri,
- obj = roa.roa,
+ new_obj = roa.roa,
repository = repository,
handler = roa.published_callback)
for ghostbuster in self.unpublished_ghostbusters(stale):
logger.debug("Retrying publication for %s", ghostbuster)
- publisher.publish(
- cls = rpki.publication.ghostbuster_elt,
+ publisher.queue(
uri = ghostbuster.uri,
- obj = ghostbuster.ghostbuster,
+ new_obj = ghostbuster.ghostbuster,
repository = repository,
handler = ghostbuster.published_callback)
+ for ee_cert in self.unpublished_ee_certificates(stale):
+ logger.debug("Retrying publication for %s", ee_cert)
+ publisher.queue(
+ uri = ee_cert.uri,
+ new_obj = ee_cert.cert,
+ repository = repository,
+ handler = ee_cert.published_callback)
+
+
class child_cert_obj(rpki.sql.sql_persistent):
"""
Certificate that has been issued to a child.
@@ -1436,6 +1482,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""
Initialize a child_cert_obj.
"""
+
rpki.sql.sql_persistent.__init__(self)
self.gctx = gctx
self.child_id = child_id
@@ -1451,6 +1498,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""
Fetch child object to which this child_cert object links.
"""
+
return rpki.left_right.child_elt.sql_fetch(self.gctx, self.child_id)
@property
@@ -1459,6 +1507,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""
Fetch ca_detail object to which this child_cert object links.
"""
+
return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
@ca_detail.deleter
@@ -1473,6 +1522,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""
Return the tail (filename) portion of the URI for this child_cert.
"""
+
return self.cert.gSKI() + ".cer"
@property
@@ -1480,6 +1530,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""
Return the publication URI for this child_cert.
"""
+
return self.ca_detail.ca.sia_uri + self.uri_tail
def revoke(self, publisher, generate_crl_and_manifest = True):
@@ -1491,10 +1542,9 @@ class child_cert_obj(rpki.sql.sql_persistent):
ca = ca_detail.ca
logger.debug("Revoking %r %r", self, self.uri)
revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail)
- publisher.withdraw(
- cls = rpki.publication.certificate_elt,
- uri = self.uri,
- obj = self.cert,
+ publisher.queue(
+ uri = self.uri,
+ old_obj = self.cert,
repository = ca.parent.repository)
self.gctx.sql.sweep()
self.sql_delete()
@@ -1625,6 +1675,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""
Publication callback: check result and mark published.
"""
+
pdu.raise_if_error()
self.published = None
self.sql_mark_dirty()
@@ -1649,6 +1700,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent):
"""
Initialize a revoked_cert_obj.
"""
+
rpki.sql.sql_persistent.__init__(self)
self.gctx = gctx
self.serial = serial
@@ -1664,6 +1716,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent):
"""
Fetch ca_detail object to which this revoked_cert_obj links.
"""
+
return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
@classmethod
@@ -1671,6 +1724,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent):
"""
Revoke a certificate.
"""
+
return cls(
serial = cert.getSerial(),
expires = cert.getNotAfter(),
@@ -1712,6 +1766,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Fetch ca_detail object to which this roa_obj links.
"""
+
return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
@ca_detail.deleter
@@ -1725,6 +1780,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Extra SQL fetch actions for roa_obj -- handle prefix lists.
"""
+
for version, datatype, attribute in ((4, rpki.resource_set.roa_prefix_set_ipv4, "ipv4"),
(6, rpki.resource_set.roa_prefix_set_ipv6, "ipv6")):
setattr(self, attribute, datatype.from_sql(
@@ -1739,6 +1795,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Extra SQL insert actions for roa_obj -- handle prefix lists.
"""
+
for version, prefix_set in ((4, self.ipv4), (6, self.ipv6)):
if prefix_set:
self.gctx.sql.executemany(
@@ -1753,6 +1810,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Extra SQL delete actions for roa_obj -- handle prefix lists.
"""
+
self.gctx.sql.execute("DELETE FROM roa_prefix WHERE roa_id = %s", (self.roa_id,))
def __repr__(self):
@@ -1894,10 +1952,9 @@ class roa_obj(rpki.sql.sql_persistent):
self.sql_store()
logger.debug("Generating %r URI %s", self, self.uri)
- publisher.publish(
- cls = rpki.publication.roa_elt,
+ publisher.queue(
uri = self.uri,
- obj = self.roa,
+ new_obj = self.roa,
repository = ca.parent.repository,
handler = self.published_callback)
if not fast:
@@ -1942,9 +1999,10 @@ class roa_obj(rpki.sql.sql_persistent):
logger.debug("Withdrawing %r %s and revoking its EE cert", self, uri)
rpki.rpkid.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
- publisher.withdraw(cls = rpki.publication.roa_elt, uri = uri, obj = roa,
- repository = ca_detail.ca.parent.repository,
- handler = False if allow_failure else None)
+ publisher.queue(uri = uri,
+ old_obj = roa,
+ repository = ca_detail.ca.parent.repository,
+ handler = False if allow_failure else None)
if not regenerate:
self.sql_mark_deleted()
@@ -1958,6 +2016,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Reissue ROA associated with this roa_obj.
"""
+
if self.ca_detail is None:
self.generate(publisher = publisher, fast = fast)
else:
@@ -1967,6 +2026,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Return publication URI for a public key.
"""
+
return self.ca_detail.ca.sia_uri + key.gSKI() + ".roa"
@property
@@ -1974,6 +2034,7 @@ class roa_obj(rpki.sql.sql_persistent):
"""
Return the publication URI for this roa_obj's ROA.
"""
+
return self.ca_detail.ca.sia_uri + self.uri_tail
@property
@@ -1982,6 +2043,7 @@ class roa_obj(rpki.sql.sql_persistent):
Return the tail (filename portion) of the publication URI for this
roa_obj's ROA.
"""
+
return self.cert.gSKI() + ".roa"
@@ -2024,6 +2086,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
"""
Fetch self object to which this ghostbuster_obj links.
"""
+
return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id)
@property
@@ -2032,6 +2095,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
"""
Fetch ca_detail object to which this ghostbuster_obj links.
"""
+
return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
def __init__(self, gctx = None, self_id = None, ca_detail_id = None, vcard = None):
@@ -2097,10 +2161,9 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
self.sql_store()
logger.debug("Generating Ghostbuster record %r", self.uri)
- publisher.publish(
- cls = rpki.publication.ghostbuster_elt,
+ publisher.queue(
uri = self.uri,
- obj = self.ghostbuster,
+ new_obj = self.ghostbuster,
repository = ca.parent.repository,
handler = self.published_callback)
if not fast:
@@ -2110,6 +2173,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
"""
Check publication result.
"""
+
pdu.raise_if_error()
self.published = None
self.sql_mark_dirty()
@@ -2144,9 +2208,10 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
logger.debug("Withdrawing %r %s and revoking its EE cert", self, uri)
rpki.rpkid.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
- publisher.withdraw(cls = rpki.publication.ghostbuster_elt, uri = uri, obj = ghostbuster,
- repository = ca_detail.ca.parent.repository,
- handler = False if allow_failure else None)
+ publisher.queue(uri = uri,
+ old_obj = ghostbuster,
+ repository = ca_detail.ca.parent.repository,
+ handler = False if allow_failure else None)
if not regenerate:
self.sql_mark_deleted()
@@ -2160,6 +2225,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
"""
Reissue Ghostbuster associated with this ghostbuster_obj.
"""
+
if self.ghostbuster is None:
self.generate(publisher = publisher, fast = fast)
else:
@@ -2169,6 +2235,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
"""
Return publication URI for a public key.
"""
+
return self.ca_detail.ca.sia_uri + key.gSKI() + ".gbr"
@property
@@ -2176,6 +2243,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
"""
Return the publication URI for this ghostbuster_obj's ghostbuster.
"""
+
return self.ca_detail.ca.sia_uri + self.uri_tail
@property
@@ -2184,6 +2252,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
Return the tail (filename portion) of the publication URI for this
ghostbuster_obj's ghostbuster.
"""
+
return self.cert.gSKI() + ".gbr"
@@ -2221,6 +2290,7 @@ class ee_cert_obj(rpki.sql.sql_persistent):
"""
Fetch self object to which this ee_cert_obj links.
"""
+
return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id)
@property
@@ -2229,6 +2299,7 @@ class ee_cert_obj(rpki.sql.sql_persistent):
"""
Fetch ca_detail object to which this ee_cert_obj links.
"""
+
return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
@ca_detail.deleter
@@ -2246,6 +2317,7 @@ class ee_cert_obj(rpki.sql.sql_persistent):
Although, really, one has to ask why we don't just store g(SKI)
in rpkid.sql instead of ski....
"""
+
return base64.urlsafe_b64encode(self.ski).rstrip("=")
@gski.setter
@@ -2257,6 +2329,7 @@ class ee_cert_obj(rpki.sql.sql_persistent):
"""
Return the publication URI for this ee_cert_obj.
"""
+
return self.ca_detail.ca.sia_uri + self.uri_tail
@property
@@ -2265,6 +2338,7 @@ class ee_cert_obj(rpki.sql.sql_persistent):
Return the tail (filename portion) of the publication URI for this
ee_cert_obj.
"""
+
return self.cert.gSKI() + ".cer"
@classmethod
@@ -2292,12 +2366,11 @@ class ee_cert_obj(rpki.sql.sql_persistent):
ca_detail_id = ca_detail.ca_detail_id,
cert = cert)
- publisher.publish(
- cls = rpki.publication.certificate_elt,
- uri = self.uri,
- obj = self.cert,
+ publisher.queue(
+ uri = self.uri,
+ new_obj = self.cert,
repository = ca.parent.repository,
- handler = self.published_callback)
+ handler = self.published_callback)
self.sql_store()
@@ -2316,10 +2389,9 @@ class ee_cert_obj(rpki.sql.sql_persistent):
ca = ca_detail.ca
logger.debug("Revoking %r %r", self, self.uri)
revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail)
- publisher.withdraw(cls = rpki.publication.certificate_elt,
- uri = self.uri,
- obj = self.cert,
- repository = ca.parent.repository)
+ publisher.queue(uri = self.uri,
+ old_obj = self.cert,
+ repository = ca.parent.repository)
self.gctx.sql.sweep()
self.sql_delete()
if generate_crl_and_manifest:
@@ -2401,12 +2473,12 @@ class ee_cert_obj(rpki.sql.sql_persistent):
self.sql_mark_dirty()
- publisher.publish(
- cls = rpki.publication.certificate_elt,
- uri = self.uri,
- obj = self.cert,
+ publisher.queue(
+ uri = self.uri,
+ old_obj = old_cert,
+ new_obj = self.cert,
repository = ca_detail.ca.parent.repository,
- handler = self.published_callback)
+ handler = self.published_callback)
if must_revoke:
revoked_cert_obj.revoke(cert = old_cert.cert, ca_detail = old_ca_detail)
@@ -2423,6 +2495,7 @@ class ee_cert_obj(rpki.sql.sql_persistent):
"""
Publication callback: check result and mark published.
"""
+
pdu.raise_if_error()
self.published = None
self.sql_mark_dirty()
@@ -2451,29 +2524,37 @@ class publication_queue(object):
if self.replace:
self.uris = {}
- def _add(self, uri, obj, repository, handler, make_pdu):
+ def queue(self, uri, repository, handler = None, old_obj = None, new_obj = None):
+
+ assert old_obj is not None or new_obj is not None
+ assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri))
+ assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri))
+
rid = id(repository)
if rid not in self.repositories:
self.repositories[rid] = repository
self.msgs[rid] = rpki.publication.msg.query()
+
if self.replace and uri in self.uris:
- logger.debug("Removing publication duplicate <%s %r %r>",
- self.uris[uri].action, self.uris[uri].uri, self.uris[uri].payload)
+ logger.debug("Removing publication duplicate %r", self.uris[uri])
self.msgs[rid].remove(self.uris.pop(uri))
- pdu = make_pdu(uri = uri, obj = obj)
+
+ hash = None if old_obj is None else rpki.x509.sha256(old_obj.get_Base64()).encode("hex")
+
+ if new_obj is None:
+ pdu = rpki.publication.withdraw_elt.make_pdu(uri = uri, hash = hash)
+ else:
+ pdu = rpki.publication.publish_elt.make_pdu( uri = uri, hash = hash, payload = new_obj)
+
if handler is not None:
self.handlers[id(pdu)] = handler
pdu.tag = id(pdu)
+
self.msgs[rid].append(pdu)
+
if self.replace:
self.uris[uri] = pdu
- def publish(self, cls, uri, obj, repository, handler = None):
- return self._add( uri, obj, repository, handler, cls.make_publish)
-
- def withdraw(self, cls, uri, obj, repository, handler = None):
- return self._add( uri, obj, repository, handler, cls.make_withdraw)
-
def call_pubd(self, cb, eb):
def loop(iterator, rid):
logger.debug("Calling pubd[%r]", self.repositories[rid])
@@ -2488,5 +2569,5 @@ class publication_queue(object):
return sum(len(self.msgs[rid]) for rid in self.repositories)
def empty(self):
- assert (not self.msgs) == (self.size == 0)
+ assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size)
return not self.msgs