diff options
author | Rob Austein <sra@hactrn.net> | 2014-07-07 23:13:35 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2014-07-07 23:13:35 +0000 |
commit | 13a65b463cd0acedd3bc36c9437d5ee8b2e26b60 (patch) | |
tree | d129b276d8e77591943d03f1995ac76c3eb6d5f8 /rpki/publication.py | |
parent | bfba2f0ce8f8416b9e5f91542068d0d6470bc19f (diff) |
Checkpoint of SQL-based publish and withdraw processing. Doesn't
handle publish-with-overwrite correctly yet, not generating RRDP files
yet, but passes "make test" without doing anything obviously insane.
svn path=/branches/tk705/; revision=5887
Diffstat (limited to 'rpki/publication.py')
-rw-r--r-- | rpki/publication.py | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/rpki/publication.py b/rpki/publication.py index 87a097c9..19ab2107 100644 --- a/rpki/publication.py +++ b/rpki/publication.py @@ -55,14 +55,14 @@ class base_publication_elt(rpki.xml_utils.base_elt, publication_namespace): def __repr__(self): return rpki.log.log_repr(self, self.uri, self.payload) - def serve_dispatch(self, r_msg, cb, eb): + def serve_dispatch(self, r_msg, snapshot, cb, eb): """ Action dispatch handler. """ try: self.client.check_allowed_uri(self.uri) - self.serve_action() + self.serve_action(snapshot) r_pdu = self.__class__() r_pdu.tag = self.tag r_pdu.uri = self.uri @@ -94,6 +94,7 @@ class base_publication_elt(rpki.xml_utils.base_elt, publication_namespace): """ No-op, since this is not a <report_error/> PDU. """ + pass @@ -121,20 +122,20 @@ class publish_elt(base_publication_elt): elt.text = self.payload.get_Base64() return elt - def serve_action(self): + def serve_action(self, snapshot): """ Publish an object. """ logger.info("Publishing %s", self.payload.tracking_data(self.uri)) + snapshot.publish(self.client, self.payload, self.uri) filename = self.uri_to_filename() filename_tmp = filename + ".tmp" dirname = os.path.dirname(filename) if not os.path.isdir(dirname): os.makedirs(dirname) - f = open(filename_tmp, "wb") - f.write(self.payload.get_DER()) - f.close() + with open(filename_tmp, "wb") as f: + f.write(self.payload.get_DER()) os.rename(filename_tmp, filename) @classmethod @@ -151,12 +152,13 @@ class withdraw_elt(base_publication_elt): element_name = "withdraw" - def serve_action(self): + def serve_action(self, snapshot): """ Withdraw an object, then recursively delete empty directories. """ logger.info("Withdrawing %s", self.uri) + snapshot.withdraw(self.client, self.uri) filename = self.uri_to_filename() try: os.remove(filename) @@ -194,16 +196,18 @@ class report_error_elt(rpki.xml_utils.text_elt, publication_namespace): attributes = ("tag", "error_code") text_attribute = "error_text" + error_code = None error_text = None def __repr__(self): - return rpki.log.log_repr(self) + return rpki.log.log_repr(self, self.error_code, self.error_text) @classmethod def from_exception(cls, e, tag = None): """ Generate a <report_error/> element from an exception. """ + self = cls() self.tag = tag self.error_code = e.__class__.__name__ @@ -223,6 +227,7 @@ class report_error_elt(rpki.xml_utils.text_elt, publication_namespace): """ Raise exception associated with this <report_error/> PDU. """ + t = rpki.exceptions.__dict__.get(self.error_code) if isinstance(t, type) and issubclass(t, rpki.exceptions.RPKI_Exception): raise t(getattr(self, "text", None)) @@ -247,9 +252,11 @@ class msg(rpki.xml_utils.msg, publication_namespace): """ Serve one msg PDU. """ + if not self.is_query(): raise rpki.exceptions.BadQuery("Message type is not query") r_msg = self.__class__.reply() + snapshot = gctx.session.new_snapshot() if len(self) > 0 else None def loop(iterator, q_pdu): @@ -257,18 +264,20 @@ class msg(rpki.xml_utils.msg, publication_namespace): if not isinstance(e, rpki.exceptions.NotFound): logger.exception("Exception processing PDU %r", q_pdu) r_msg.append(report_error_elt.from_exception(e, q_pdu.tag)) + snapshot.sql_delete() cb(r_msg) try: q_pdu.gctx = gctx q_pdu.client = client - q_pdu.serve_dispatch(r_msg, iterator, fail) + q_pdu.serve_dispatch(r_msg, snapshot, iterator, fail) except (rpki.async.ExitNow, SystemExit): raise except Exception, e: fail(e) def done(): + gctx.session.add_snapshot(snapshot) cb(r_msg) rpki.async.iterator(self, loop, done) |