aboutsummaryrefslogtreecommitdiff
path: root/rpki/pubd.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2014-08-28 21:43:10 +0000
committerRob Austein <sra@hactrn.net>2014-08-28 21:43:10 +0000
commit503187d5486b2d629ea7a157fb427ab1fba67d06 (patch)
tree54ac386e8f120efc059c361be49e039ccae372b5 /rpki/pubd.py
parentf9237931ee7d7a9cdc024913c395a0cbfe395d5e (diff)
Rewrite pubd error handling code.
svn path=/branches/tk705/; revision=5943
Diffstat (limited to 'rpki/pubd.py')
-rw-r--r--rpki/pubd.py53
1 files changed, 15 insertions, 38 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py
index 229f9511..d0085892 100644
--- a/rpki/pubd.py
+++ b/rpki/pubd.py
@@ -207,6 +207,7 @@ class main(object):
q_cms = rpki.publication.cms_msg_no_sax(DER = q_der)
q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue))
q_cms.check_replay_sql(client, client.client_handle)
+ self.sql.commit() # commit the replay timestamp
if q_msg.get("type") != "query":
raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type"))
r_msg = Element(pub_tag_msg, nsmap = pub_nsmap, type = "reply", version = pub_version)
@@ -221,7 +222,7 @@ class main(object):
r_pdu.set("tag", q_pdu.get("tag"))
else:
assert q_pdu.tag in (pub_tag_publish, pub_tag_withdraw)
- if delta is None and not failed:
+ if delta is None:
delta = self.session.new_delta()
client.check_allowed_uri(q_pdu.get("uri"))
if q_pdu.tag == pub_tag_publish:
@@ -235,40 +236,26 @@ class main(object):
if q_pdu.get("tag") is not None:
r_pdu.set("tag", q_pdu.get("tag"))
except Exception, e:
- if not isinstance(e, rpki.exceptions.NotFound):
- logger.exception("Exception processing PDU %r", q_pdu)
+ logger.exception("Exception processing PDU %r", q_pdu)
r_pdu = SubElement(r_msg, pub_tag_report_error, error_code = e.__class__.__name__)
r_pdu.text = str(e)
if q_pdu.get("tag") is not None:
r_pdu.set("tag", q_pdu.get("tag"))
failed = True
- if delta is not None:
- delta.sql_delete()
- self.session.serial -= 1
- self.session.sql_mark_dirty()
- if delta is not None:
- assert not failed
+ if failed:
+ self.sql.rollback()
+
+ elif delta is not None:
delta.activate()
self.sql.sweep()
self.session.generate_snapshot()
self.sql.commit()
-
- # These could be merged, and perhaps should be, among other
- # reasons because we need to do something about expiring old
- # deltas and deleting old XML files, and it's probably easier
- # to avoid race conditions if we do this all in one place.
-
- self.session.write_deltas()
- self.session.write_snapshot()
- self.session.write_notification()
-
- # Write changes to rsyncd from the delta we just committed
-
+ self.session.write_rrdp_files() # Still need to expire old deltas and delete old XML files.
delta.write_rsync()
request.send_cms_response(rpki.publication.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
- self.sql.commit()
+
except Exception, e:
logger.exception("Unhandled exception processing client query, path %r", request.path)
@@ -381,25 +368,15 @@ class session_obj(rpki.sql.sql_persistent):
def notification_fn(self):
return "updates.xml"
- def write_snapshot(self):
+ def write_rrdp_files(self):
"""
- Write current session snapshot to disk.
- """
-
- self.write_rrdp_file(self.snapshot_fn, self.snapshot)
-
- def write_deltas(self):
- """
- Write any missing deltas to disk.
+ Write the current set of RRDP files to disk.
"""
for delta in self.deltas:
self.write_rrdp_file(delta.fn, delta.xml)
- def write_notification(self):
- """
- Write current notification file to disk.
- """
+ self.write_rrdp_file(self.snapshot_fn, self.snapshot)
xml = Element(rrdp_tag_notification, nsmap = rrdp_nsmap,
version = rrdp_version,
@@ -446,11 +423,9 @@ class delta_obj(rpki.sql.sql_persistent):
@classmethod
def create(cls, session):
self = cls()
- session.serial += 1
- session.sql_mark_dirty()
self.gctx = session.gctx
self.session_id = session.session_id
- self.serial = session.serial
+ self.serial = session.serial + 1
self.xml = None
self.hash = None
self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval
@@ -467,6 +442,8 @@ class delta_obj(rpki.sql.sql_persistent):
self.xml = ElementToString(self.deltas, pretty_print = True)
self.hash = rpki.x509.sha256(self.xml).encode("hex")
self.sql_mark_dirty()
+ self.session.serial += 1
+ self.session.sql_mark_dirty()
def publish(self, client, der, uri, hash):
obj = object_obj.current_object_at_uri(client, self, uri)