diff options
Diffstat (limited to 'rpki')
-rw-r--r-- | rpki/pubd.py | 299 | ||||
-rw-r--r-- | rpki/publication.py | 73 | ||||
-rw-r--r-- | rpki/rpkid.py | 2 | ||||
-rw-r--r-- | rpki/sql_schemas.py | 34 |
4 files changed, 188 insertions, 220 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py index 8f5b2605..7d677f0a 100644 --- a/rpki/pubd.py +++ b/rpki/pubd.py @@ -40,10 +40,28 @@ import rpki.publication import rpki.publication_control import rpki.daemonize -from lxml.etree import Element, SubElement, ElementTree, Comment +from lxml.etree import Element, SubElement, ElementTree, Comment, tostring as ElementToString logger = logging.getLogger(__name__) + +# Temporary, these should come from the schema or something +rrdp_namespace = "{http://www.ripe.net/rpki/rrdp}" +rrdp_version = "1" + + + +def Base64SubElement(elt, name, obj, attrib = None, **kwargs): + """ + Convenience wrapper around SubElement for use with Base64 text. + """ + + se = SubElement(elt, name, attrib, **kwargs) + se.text = "\n" + obj.get_Base64() + se.tail = "\n" + return se + + class main(object): """ Main program for pubd. @@ -109,8 +127,9 @@ class main(object): self.publication_multimodule = self.cfg.getboolean("publication-multimodule", False) + if False: + self.rrdp_uri_base = self.cfg.get("rrdp-uri-base") self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h")) - self.rrdp_uri_base = self.cfg.get("rrdp-uri-base") self.rrdp_publication_base = self.cfg.get("rrdp-publication-base", "rrdp-publication/") self.session = session_obj.fetch(self) @@ -151,11 +170,6 @@ class main(object): Process one PDU from a client. """ - def done(r_msg): - self.sql.sweep() - cb(code = 200, - body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) - try: match = self.client_url_regexp.search(path) if match is None: @@ -167,7 +181,44 @@ class main(object): q_cms = rpki.publication.cms_msg(DER = query) q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue)) q_cms.check_replay_sql(client, client.client_handle) - q_msg.serve_top_level(self, client, done) + if not q_msg.is_query(): + raise rpki.exceptions.BadQuery("Message type is not query") + r_msg = q_msg.__class__.reply() + delta = self.session.new_delta() + failed = False + for q_pdu in q_msg: + try: + if isinstance(q_pdu, rpki.publication.list_elt): + for obj in client.published_objects: + r_pdu = q_pdu.__class__() + r_pdu.tag = q_pdu.tag + r_pdu.uri = obj.uri + r_pdu.hash = obj.hash + r_msg.append(r_pdu) + else: + q_pdu.gctx = self + q_pdu.client = client + q_pdu.client.check_allowed_uri(q_pdu.uri) + q_pdu.serve_action(delta) + r_pdu = q_pdu.__class__() + r_pdu.tag = q_pdu.tag + r_pdu.uri = q_pdu.uri + r_msg.append(r_pdu) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + if not isinstance(e, rpki.exceptions.NotFound): + logger.exception("Exception processing PDU %r", q_pdu) + r_msg.append(rpki.publication.report_error_elt.from_exception(e, q_pdu.tag)) + delta.sql_delete() + failed = True + # + # Need to check "failed" flag here? + # + delta.activate() + self.sql.sweep() + cb(code = 200, + body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) except (rpki.async.ExitNow, SystemExit): raise except Exception, e: @@ -181,17 +232,13 @@ class session_obj(rpki.sql.sql_persistent): An RRDP session. """ - # We probably need additional columns or an additional table to - # handle cleanup of old serial numbers. Not sure quite what these - # would look like, other than that the SQL datatypes are probably - # BIGINT and DATETIME. Maybe a table to track time at which we - # retired a particular serial number, or, to save us the arithmetic, - # the corresponding cleanup time? - sql_template = rpki.sql.template( "session", "session_id", - "uuid") + "uuid", + "serial", + "snapshot", + "hash") def __repr__(self): return rpki.log.log_repr(self, self.uuid, self.serial) @@ -205,9 +252,11 @@ class session_obj(rpki.sql.sql_persistent): self = cls.sql_fetch(gctx, 1) if self is None: self = cls() + self.uuid = str(uuid.uuid4()) + self.serial = 0 + self.snapshot = None + self.hash = None self.gctx = gctx - self.session_id = 1 - self.uuid = uuid.uuid4() self.sql_store() return self @@ -216,155 +265,143 @@ class session_obj(rpki.sql.sql_persistent): return object_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,)) @property - def snapshots(self): - return snapshot_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,)) + def deltas(self): + return delta_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,)) - @property - def current_snapshot(self): - return snapshot_obj.sql_fetch_where1(self.gctx, - "session_id = %s AND activated IS NOT NULL AND expires IS NULL", - (self.session_id,)) - - def new_snapshot(self): - return snapshot_obj.create(self) - - def activate_snapshot(self, new_snapshot): - now = rpki.sundial.now() - old_snapshot = self.current_snapshot - if old_snapshot is not None: - old_snapshot.expires = now + self.gctx.rrdp_expiration_interval - old_snapshot.sql_store() - new_snapshot.activated = now - new_snapshot.sql_store() - - def expire_snapshots(self): - for snapshot in snapshot_obj.sql_fetch_where(self.gctx, - "session_id = %s AND expires IS NOT NULL AND expires < %s", - (self.session_id, rpki.sundial.now())): - snapshot.sql_delete() + def new_delta(self): + return delta_obj.create(self) - def write_notification(self): + def expire_deltas(self): + for delta in delta_obj.sql_fetch_where(self.gctx, + "session_id = %s AND expires IS NOT NULL AND expires < %s", + (self.session_id, rpki.sundial.now())): + delta.sql_delete() + + def write_rrdp_file(self, fn, xml, overwrite = False): """ - Write current notification file to disk. + Save an RRDP XML object to disk. """ - serial = self.current_shapshot.serial - fn = "%s/notification.xml" % self.uuid - - xml = Element(rrdp_namespace + "notification", - version = rrdp_version, - session_id = uuid, - serial = serial) - - SubElement(xml, rrdp_namespace + "snapshot", - uri = "%s/%s/snapshot/%d.xml" % (self.rrdp_uri_base, self.uuid, serial), - hash = um_where_do_we_store_this) - - for delta in some_sql_query_here(): - SubElement(xml, rrdp_namespace + "delta", - from = delta.from_serial, - to = delta.to_serial, - uri = delta.uri, - hash = delta.hash) - + if not overwrite and os.path.exists(os.path.join(self.gctx.rrdp_publication_base, fn)): + raise SomeExceptionOrAnother("%s already exists, not regenerating" % fn) rpki.relaxng.rrdp.assertValid(xml) - tn = os.path.join(self.rrdp_publication_base, fn + ".%s.tmp" % os.getpid()) + tn = os.path.join(self.gctx.rrdp_publication_base, fn + ".%s.tmp" % os.getpid()) if not os.path.isdir(os.path.dirname(tn)): os.makedirs(os.path.dirname(tn)) ElementTree(xml).write(tn) - os.rename(tn, os.path.join(self.rrdp_publication_base, fn)) + os.rename(tn, os.path.join(self.gctx.rrdp_publication_base, fn)) - - def write_snapshot(self): + def generate_snapshot(self): """ - Write current RRDP snapshot to disk. + Generate an XML snapshot of this session. """ - serial = self.current_shapshot.serial - fn = "%s/snapshot/%d.xml" % (self.uuid, serial) - - if os.path.exists(os.path.join(self.rrdp_publication_base, fn)): - logger.warning("Snapshot %s already exists, this is suprising, not regenerating") - return - - xml = Element(rrdp_namespace + "snapshot", version = rrdp_version, session_id = uuid, serial = serial) + xml = Element(rrdp_namespace + "snapshot", version = rrdp_version, session_id = self.uuid, serial = self.serial) + for obj in self.objects: + Base64SubElement(xml, rrdp_namespace + "publish", obj, uri = obj.uri) + self.snapshot = ElementToString(xml, pretty_print = True) + self.hash = rpki.x509.sha256(self.snapshot).encode("hex") + self.sql_mark_dirty() - for obj in object_obj.sql_fetch_where(self.gctx, "session_id = %s AND withdrawn_snapshot_id IS NULL", - (self.session_id,)): - se = SubElement(xml, rrdp_namespace + "publish", uri = obj.uri) - se.text = "\n" + obj.get_Base64() - se.tail = "\n" + def write_notification(self): + """ + Write current notification file to disk. + """ + xml = Element(rrdp_namespace + "notification", + version = rrdp_version, + session_id = self.uuid, + serial = self.serial) + SubElement(xml, rrdp_namespace + "snapshot", + uri = "%s/%s/snapshot/%d.xml" % (self.gctx.rrdp_uri_base, self.uuid, self.serial), + hash = self.hash) + for delta in self.deltas: + SubElement(xml, rrdp_namespace + "delta", + { "from" : str(delta.serial - 1), + "to" : str(delta.serial), + "uri" : delta.uri, + "hash" : delta.hash }) rpki.relaxng.rrdp.assertValid(xml) + self.write_rrdp_file("%s/notification.xml" % self.uuid, xml, overwrite = True) - tn = os.path.join(self.rrdp_publication_base, fn + ".%s.tmp" % os.getpid()) - if not os.path.isdir(os.path.dirname(tn)): - os.makedirs(os.path.dirname(tn)) - ElementTree(xml).write(tn) - os.rename(tn, os.path.join(self.rrdp_publication_base, fn)) - -class snapshot_obj(rpki.sql.sql_persistent): +class delta_obj(rpki.sql.sql_persistent): """ - An RRDP session snapshot. + An RRDP delta. """ sql_template = rpki.sql.template( - "snapshot", - "snapshot_id", - ("activated", rpki.sundial.datetime), - ("expires", rpki.sundial.datetime), - "session_id") + "delta", + "delta_id", + "session_id", + "serial", + "xml", + "hash", + ("expires", rpki.sundial.datetime)) @property @rpki.sql.cache_reference def session(self): return session_obj.sql_fetch(self.gctx, self.session_id) + # a) Consider making this __init__() instead, just make session optional. + # + # b) We need some place to store the changes that go into this delta + # while running .publish() and .withdraw(); need not be a field + # that goes in SQL, a list that we only use while constructing + # would be fine. + # + # c) This probably means that we shouldn't even try to put this into + # SQL until we're done building it, which is probably OK. + # @classmethod def create(cls, session): self = cls() + session.serial += 1 + session.sql_mark_dirty() self.gctx = session.gctx self.session_id = session.session_id - self.activated = None - self.expires = None - self.sql_store() + self.serial = session.serial + self.xml = None + self.hash = None + self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval + self.deltas = Element(rrdp_namespace + "deltas", + to = str(self.serial), + version = rrdp_version, + session_id = session.uuid) + self.deltas.set("from", str(self.serial - 1)) + SubElement(self.deltas, rrdp_namespace + "delta", serial = str(self.serial)) return self - @property - def serial(self): - """ - I know that using an SQL ID for any other purpose is usually a bad - idea, but in this case it has exactly the right properties, and we - really do want both the autoincrement behavior and the foreign key - behavior to tie to the snapshot serial numbers. So risk it. - - Well, OK, only almost the right properties. auto-increment - probably does not back up if we ROLLBACK, which could leave gaps - in the sequence. So may need to rework this, eg, to use a serial - field in the session object. Ignore the issue until we have the - rest of this working. - """ - - return self.snapshot_id + def activate(self): + rpki.relaxng.rrdp.assertValid(self.deltas) + self.xml = ElementToString(self.deltas, pretty_print = True) + self.hash = rpki.x509.sha256(self.xml).encode("hex") + del self.deltas + self.sql_mark_dirty() def publish(self, client, obj, uri, hash): if hash is not None: self.withdraw(client, uri, hash) - if object_obj.current_object_at_uri(client, self, uri) is not None: + elif object_obj.current_object_at_uri(client, self, uri) is not None: raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri) logger.debug("Publishing %s", uri) - return object_obj.create(client, self, obj, uri) + object_obj.create(client, self, obj, uri) + se = Base64SubElement(self.deltas[0], rrdp_namespace + "publish", obj, uri = uri) + if hash is not None: + se.set("hash", hash) + rpki.relaxng.rrdp.assertValid(self.deltas) def withdraw(self, client, uri, hash): obj = object_obj.current_object_at_uri(client, self, uri) if obj is None: raise rpki.exceptions.NoObjectAtURI("No object published at %s" % uri) if obj.hash != hash: - raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (%s, %s)" % (uri, obj.hash, hash)) + raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash)) logger.debug("Withdrawing %s", uri) obj.delete(self) - + SubElement(self.deltas[0], rrdp_namespace + "withdraw", uri = uri, hash = hash) + rpki.relaxng.rrdp.assertValid(self.deltas) class object_obj(rpki.sql.sql_persistent): @@ -378,13 +415,11 @@ class object_obj(rpki.sql.sql_persistent): "uri", "hash", "payload", - "published_snapshot_id", - "withdrawn_snapshot_id", "client_id", "session_id") def __repr__(self): - return rpki.log.log_repr(self, self.uri, self.published_snapshot_id, self.withdrawn_snapshot_id) + return rpki.log.log_repr(self, self.uri) @property @rpki.sql.cache_reference @@ -397,27 +432,23 @@ class object_obj(rpki.sql.sql_persistent): return rpki.publication_control.client_elt.sql_fetch(self.gctx, self.client_id) @classmethod - def create(cls, client, snapshot, obj, uri): + def create(cls, client, delta, obj, uri): self = cls() - self.gctx = snapshot.gctx + self.gctx = delta.gctx self.uri = uri self.payload = obj self.hash = rpki.x509.sha256(obj.get_DER()).encode("hex") - logger.debug("Computed hash %s of %r", self.hash, obj) - self.published_snapshot_id = snapshot.snapshot_id - self.withdrawn_snapshot_id = None - self.session_id = snapshot.session_id + logger.debug("Computed hash %s for %r", self.hash, obj) + self.session_id = delta.session_id self.client_id = client.client_id self.sql_mark_dirty() return self - def delete(self, snapshot): - self.withdrawn_snapshot_id = snapshot.snapshot_id - #self.sql_mark_dirty() - self.sql_store() + def delete(self, delta): + self.sql_mark_deleted() @classmethod - def current_object_at_uri(cls, client, snapshot, uri): + def current_object_at_uri(cls, client, delta, uri): return cls.sql_fetch_where1(client.gctx, - "session_id = %s AND client_id = %s AND withdrawn_snapshot_id IS NULL AND uri = %s", - (snapshot.session_id, client.client_id, uri)) + "session_id = %s AND client_id = %s AND uri = %s", + (delta.session_id, client.client_id, uri)) diff --git a/rpki/publication.py b/rpki/publication.py index c09f4895..f619d0d9 100644 --- a/rpki/publication.py +++ b/rpki/publication.py @@ -58,25 +58,6 @@ class base_publication_elt(rpki.xml_utils.base_elt, publication_namespace): def __repr__(self): return rpki.log.log_repr(self, self.tag, self.uri, self.hash, self.payload) - def serve_dispatch(self, r_msg, snapshot, cb, eb): - """ - Action dispatch handler. - """ - - try: - self.client.check_allowed_uri(self.uri) - self.serve_action(snapshot) - r_pdu = self.__class__() - r_pdu.tag = self.tag - r_pdu.uri = self.uri - r_msg.append(r_pdu) - cb() - except rpki.exceptions.NoObjectAtURI, e: - # This can happen when we're cleaning up from a prior mess, so - # we generate a <report_error/> PDU then carry on. - r_msg.append(report_error_elt.from_exception(e, self.tag)) - cb() - def uri_to_filename(self): """ Convert a URI to a local filename. @@ -128,13 +109,13 @@ class publish_elt(base_publication_elt): elt.text = self.payload.get_Base64() return elt - def serve_action(self, snapshot): + def serve_action(self, delta): """ Publish an object. """ logger.info("Publishing %s", self.payload.tracking_data(self.uri)) - snapshot.publish(self.client, self.payload, self.uri, self.hash) + delta.publish(self.client, self.payload, self.uri, self.hash) filename = self.uri_to_filename() filename_tmp = filename + ".tmp" dirname = os.path.dirname(filename) @@ -152,13 +133,13 @@ class withdraw_elt(base_publication_elt): element_name = "withdraw" - def serve_action(self, snapshot): + def serve_action(self, delta): """ Withdraw an object, then recursively delete empty directories. """ logger.info("Withdrawing %s", self.uri) - snapshot.withdraw(self.client, self.uri, self.hash) + delta.withdraw(self.client, self.uri, self.hash) filename = self.uri_to_filename() try: os.remove(filename) @@ -183,17 +164,7 @@ class list_elt(base_publication_elt): <list/> element. """ - def serve_dispatch(self, r_msg, snapshot, cb, eb): - """ - Action dispatch handler. - """ - - for obj in self.client.published_objects: - r_pdu = self.__class__() - r_pdu.tag = self.tag - r_pdu.uri = obj.uri - r_pdu.hash = obj.hash - r_msg.append(r_pdu) + pass class report_error_elt(rpki.xml_utils.text_elt, publication_namespace): @@ -257,40 +228,6 @@ class msg(rpki.xml_utils.msg, publication_namespace): # Dispatch table of PDUs for this protocol. pdus = dict((x.element_name, x) for x in (publish_elt, withdraw_elt, report_error_elt)) - def serve_top_level(self, gctx, client, cb): - """ - Serve one msg PDU. - """ - - if not self.is_query(): - raise rpki.exceptions.BadQuery("Message type is not query") - r_msg = self.__class__.reply() - snapshot = gctx.session.new_snapshot() if len(self) > 0 else None - - def loop(iterator, q_pdu): - - def fail(e): - if not isinstance(e, rpki.exceptions.NotFound): - logger.exception("Exception processing PDU %r", q_pdu) - r_msg.append(report_error_elt.from_exception(e, q_pdu.tag)) - snapshot.sql_delete() - cb(r_msg) - - try: - q_pdu.gctx = gctx - q_pdu.client = client - q_pdu.serve_dispatch(r_msg, snapshot, iterator, fail) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - fail(e) - - def done(): - gctx.session.activate_snapshot(snapshot) - cb(r_msg) - - rpki.async.iterator(self, loop, done) - class sax_handler(rpki.xml_utils.sax_handler): """ diff --git a/rpki/rpkid.py b/rpki/rpkid.py index 45044ab8..b5044b2a 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -2539,7 +2539,7 @@ class publication_queue(object): logger.debug("Removing publication duplicate %r", self.uris[uri]) self.msgs[rid].remove(self.uris.pop(uri)) - hash = None if old_obj is None else rpki.x509.sha256(old_obj.get_Base64()).encode("hex") + hash = None if old_obj is None else rpki.x509.sha256(old_obj.get_DER()).encode("hex") if new_obj is None: pdu = rpki.publication.withdraw_elt.make_pdu(uri = uri, hash = hash) diff --git a/rpki/sql_schemas.py b/rpki/sql_schemas.py index 93909c02..e06d42e7 100644 --- a/rpki/sql_schemas.py +++ b/rpki/sql_schemas.py @@ -266,18 +266,19 @@ pubd = '''-- $Id: pubd.sql 5896 2014-07-15 19:34:32Z sra $ -- SQL objects needed by pubd.py. +-- Old tables that should just be flushed if present at all. + +DROP TABLE IF EXISTS config; +DROP TABLE IF EXISTS snapshot; + -- DROP TABLE commands must be in correct (reverse dependency) order -- to satisfy FOREIGN KEY constraints. DROP TABLE IF EXISTS object; -DROP TABLE IF EXISTS snapshot; +DROP TABLE IF EXISTS delta; DROP TABLE IF EXISTS session; DROP TABLE IF EXISTS client; --- An old table that should just be flushed if present at all. - -DROP TABLE IF EXISTS config; - CREATE TABLE client ( client_id SERIAL NOT NULL, client_handle VARCHAR(255) NOT NULL, @@ -292,17 +293,22 @@ CREATE TABLE client ( CREATE TABLE session ( session_id SERIAL NOT NULL, uuid VARCHAR(36) NOT NULL, + serial BIGINT UNSIGNED NOT NULL, + snapshot TEXT, + hash CHAR(64), PRIMARY KEY (session_id), UNIQUE (uuid) ) ENGINE=InnoDB; -CREATE TABLE snapshot ( - snapshot_id SERIAL NOT NULL, - activated DATETIME, - expires DATETIME, +CREATE TABLE delta ( + delta_id SERIAL NOT NULL, + serial BIGINT UNSIGNED NOT NULL, + xml TEXT NOT NULL, + hash CHAR(64) NOT NULL, + expires DATETIME NOT NULL, session_id BIGINT UNSIGNED NOT NULL, - PRIMARY KEY (snapshot_id), - CONSTRAINT snapshot_session_id + PRIMARY KEY (delta_id), + CONSTRAINT delta_session_id FOREIGN KEY (session_id) REFERENCES session (session_id) ON DELETE CASCADE ) ENGINE=InnoDB; @@ -311,15 +317,9 @@ CREATE TABLE object ( uri VARCHAR(255) NOT NULL, hash CHAR(64) NOT NULL, payload LONGBLOB NOT NULL, - published_snapshot_id BIGINT UNSIGNED, - withdrawn_snapshot_id BIGINT UNSIGNED, client_id BIGINT UNSIGNED NOT NULL, session_id BIGINT UNSIGNED NOT NULL, PRIMARY KEY (object_id), - CONSTRAINT object_published_snapshot_id - FOREIGN KEY (published_snapshot_id) REFERENCES snapshot (snapshot_id) ON DELETE SET NULL, - CONSTRAINT object_withdrawn_snapshot_id - FOREIGN KEY (withdrawn_snapshot_id) REFERENCES snapshot (snapshot_id) ON DELETE CASCADE, CONSTRAINT object_client_id FOREIGN KEY (client_id) REFERENCES client (client_id) ON DELETE CASCADE, CONSTRAINT object_session_id |