diff options
Diffstat (limited to 'rpki/pubd.py')
-rw-r--r-- | rpki/pubd.py | 53 |
1 files changed, 15 insertions, 38 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py index 229f9511..d0085892 100644 --- a/rpki/pubd.py +++ b/rpki/pubd.py @@ -207,6 +207,7 @@ class main(object): q_cms = rpki.publication.cms_msg_no_sax(DER = q_der) q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue)) q_cms.check_replay_sql(client, client.client_handle) + self.sql.commit() # commit the replay timestamp if q_msg.get("type") != "query": raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type")) r_msg = Element(pub_tag_msg, nsmap = pub_nsmap, type = "reply", version = pub_version) @@ -221,7 +222,7 @@ class main(object): r_pdu.set("tag", q_pdu.get("tag")) else: assert q_pdu.tag in (pub_tag_publish, pub_tag_withdraw) - if delta is None and not failed: + if delta is None: delta = self.session.new_delta() client.check_allowed_uri(q_pdu.get("uri")) if q_pdu.tag == pub_tag_publish: @@ -235,40 +236,26 @@ class main(object): if q_pdu.get("tag") is not None: r_pdu.set("tag", q_pdu.get("tag")) except Exception, e: - if not isinstance(e, rpki.exceptions.NotFound): - logger.exception("Exception processing PDU %r", q_pdu) + logger.exception("Exception processing PDU %r", q_pdu) r_pdu = SubElement(r_msg, pub_tag_report_error, error_code = e.__class__.__name__) r_pdu.text = str(e) if q_pdu.get("tag") is not None: r_pdu.set("tag", q_pdu.get("tag")) failed = True - if delta is not None: - delta.sql_delete() - self.session.serial -= 1 - self.session.sql_mark_dirty() - if delta is not None: - assert not failed + if failed: + self.sql.rollback() + + elif delta is not None: delta.activate() self.sql.sweep() self.session.generate_snapshot() self.sql.commit() - - # These could be merged, and perhaps should be, among other - # reasons because we need to do something about expiring old - # deltas and deleting old XML files, and it's probably easier - # to avoid race conditions if we do this all in one place. - - self.session.write_deltas() - self.session.write_snapshot() - self.session.write_notification() - - # Write changes to rsyncd from the delta we just committed - + self.session.write_rrdp_files() # Still need to expire old deltas and delete old XML files. delta.write_rsync() request.send_cms_response(rpki.publication.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) - self.sql.commit() + except Exception, e: logger.exception("Unhandled exception processing client query, path %r", request.path) @@ -381,25 +368,15 @@ class session_obj(rpki.sql.sql_persistent): def notification_fn(self): return "updates.xml" - def write_snapshot(self): + def write_rrdp_files(self): """ - Write current session snapshot to disk. - """ - - self.write_rrdp_file(self.snapshot_fn, self.snapshot) - - def write_deltas(self): - """ - Write any missing deltas to disk. + Write the current set of RRDP files to disk. """ for delta in self.deltas: self.write_rrdp_file(delta.fn, delta.xml) - def write_notification(self): - """ - Write current notification file to disk. - """ + self.write_rrdp_file(self.snapshot_fn, self.snapshot) xml = Element(rrdp_tag_notification, nsmap = rrdp_nsmap, version = rrdp_version, @@ -446,11 +423,9 @@ class delta_obj(rpki.sql.sql_persistent): @classmethod def create(cls, session): self = cls() - session.serial += 1 - session.sql_mark_dirty() self.gctx = session.gctx self.session_id = session.session_id - self.serial = session.serial + self.serial = session.serial + 1 self.xml = None self.hash = None self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval @@ -467,6 +442,8 @@ class delta_obj(rpki.sql.sql_persistent): self.xml = ElementToString(self.deltas, pretty_print = True) self.hash = rpki.x509.sha256(self.xml).encode("hex") self.sql_mark_dirty() + self.session.serial += 1 + self.session.sql_mark_dirty() def publish(self, client, der, uri, hash): obj = object_obj.current_object_at_uri(client, self, uri) |