aboutsummaryrefslogtreecommitdiff
path: root/rpki/pubd.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2014-07-17 17:26:12 +0000
committerRob Austein <sra@hactrn.net>2014-07-17 17:26:12 +0000
commit7ae2b2bb18685d757bbcca6c9e664170f1866486 (patch)
tree988c6a8bbd07c28249c30e237cc4ebc1a8104dec /rpki/pubd.py
parentd91c0b6c6a607e2c318818c1aeb1ac756bd0492d (diff)
Start unwinding the twisty maze of method calls in the old publication code.
svn path=/branches/tk705/; revision=5898
Diffstat (limited to 'rpki/pubd.py')
-rw-r--r--rpki/pubd.py299
1 files changed, 165 insertions, 134 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py
index 8f5b2605..7d677f0a 100644
--- a/rpki/pubd.py
+++ b/rpki/pubd.py
@@ -40,10 +40,28 @@ import rpki.publication
import rpki.publication_control
import rpki.daemonize
-from lxml.etree import Element, SubElement, ElementTree, Comment
+from lxml.etree import Element, SubElement, ElementTree, Comment, tostring as ElementToString
logger = logging.getLogger(__name__)
+
+# Temporary, these should come from the schema or something
+rrdp_namespace = "{http://www.ripe.net/rpki/rrdp}"
+rrdp_version = "1"
+
+
+
+def Base64SubElement(elt, name, obj, attrib = None, **kwargs):
+ """
+ Convenience wrapper around SubElement for use with Base64 text.
+ """
+
+ se = SubElement(elt, name, attrib, **kwargs)
+ se.text = "\n" + obj.get_Base64()
+ se.tail = "\n"
+ return se
+
+
class main(object):
"""
Main program for pubd.
@@ -109,8 +127,9 @@ class main(object):
self.publication_multimodule = self.cfg.getboolean("publication-multimodule", False)
+ if False:
+ self.rrdp_uri_base = self.cfg.get("rrdp-uri-base")
self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h"))
- self.rrdp_uri_base = self.cfg.get("rrdp-uri-base")
self.rrdp_publication_base = self.cfg.get("rrdp-publication-base", "rrdp-publication/")
self.session = session_obj.fetch(self)
@@ -151,11 +170,6 @@ class main(object):
Process one PDU from a client.
"""
- def done(r_msg):
- self.sql.sweep()
- cb(code = 200,
- body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
-
try:
match = self.client_url_regexp.search(path)
if match is None:
@@ -167,7 +181,44 @@ class main(object):
q_cms = rpki.publication.cms_msg(DER = query)
q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue))
q_cms.check_replay_sql(client, client.client_handle)
- q_msg.serve_top_level(self, client, done)
+ if not q_msg.is_query():
+ raise rpki.exceptions.BadQuery("Message type is not query")
+ r_msg = q_msg.__class__.reply()
+ delta = self.session.new_delta()
+ failed = False
+ for q_pdu in q_msg:
+ try:
+ if isinstance(q_pdu, rpki.publication.list_elt):
+ for obj in client.published_objects:
+ r_pdu = q_pdu.__class__()
+ r_pdu.tag = q_pdu.tag
+ r_pdu.uri = obj.uri
+ r_pdu.hash = obj.hash
+ r_msg.append(r_pdu)
+ else:
+ q_pdu.gctx = self
+ q_pdu.client = client
+ q_pdu.client.check_allowed_uri(q_pdu.uri)
+ q_pdu.serve_action(delta)
+ r_pdu = q_pdu.__class__()
+ r_pdu.tag = q_pdu.tag
+ r_pdu.uri = q_pdu.uri
+ r_msg.append(r_pdu)
+ except (rpki.async.ExitNow, SystemExit):
+ raise
+ except Exception, e:
+ if not isinstance(e, rpki.exceptions.NotFound):
+ logger.exception("Exception processing PDU %r", q_pdu)
+ r_msg.append(rpki.publication.report_error_elt.from_exception(e, q_pdu.tag))
+ delta.sql_delete()
+ failed = True
+ #
+ # Need to check "failed" flag here?
+ #
+ delta.activate()
+ self.sql.sweep()
+ cb(code = 200,
+ body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
@@ -181,17 +232,13 @@ class session_obj(rpki.sql.sql_persistent):
An RRDP session.
"""
- # We probably need additional columns or an additional table to
- # handle cleanup of old serial numbers. Not sure quite what these
- # would look like, other than that the SQL datatypes are probably
- # BIGINT and DATETIME. Maybe a table to track time at which we
- # retired a particular serial number, or, to save us the arithmetic,
- # the corresponding cleanup time?
-
sql_template = rpki.sql.template(
"session",
"session_id",
- "uuid")
+ "uuid",
+ "serial",
+ "snapshot",
+ "hash")
def __repr__(self):
return rpki.log.log_repr(self, self.uuid, self.serial)
@@ -205,9 +252,11 @@ class session_obj(rpki.sql.sql_persistent):
self = cls.sql_fetch(gctx, 1)
if self is None:
self = cls()
+ self.uuid = str(uuid.uuid4())
+ self.serial = 0
+ self.snapshot = None
+ self.hash = None
self.gctx = gctx
- self.session_id = 1
- self.uuid = uuid.uuid4()
self.sql_store()
return self
@@ -216,155 +265,143 @@ class session_obj(rpki.sql.sql_persistent):
return object_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
@property
- def snapshots(self):
- return snapshot_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
+ def deltas(self):
+ return delta_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
- @property
- def current_snapshot(self):
- return snapshot_obj.sql_fetch_where1(self.gctx,
- "session_id = %s AND activated IS NOT NULL AND expires IS NULL",
- (self.session_id,))
-
- def new_snapshot(self):
- return snapshot_obj.create(self)
-
- def activate_snapshot(self, new_snapshot):
- now = rpki.sundial.now()
- old_snapshot = self.current_snapshot
- if old_snapshot is not None:
- old_snapshot.expires = now + self.gctx.rrdp_expiration_interval
- old_snapshot.sql_store()
- new_snapshot.activated = now
- new_snapshot.sql_store()
-
- def expire_snapshots(self):
- for snapshot in snapshot_obj.sql_fetch_where(self.gctx,
- "session_id = %s AND expires IS NOT NULL AND expires < %s",
- (self.session_id, rpki.sundial.now())):
- snapshot.sql_delete()
+ def new_delta(self):
+ return delta_obj.create(self)
- def write_notification(self):
+ def expire_deltas(self):
+ for delta in delta_obj.sql_fetch_where(self.gctx,
+ "session_id = %s AND expires IS NOT NULL AND expires < %s",
+ (self.session_id, rpki.sundial.now())):
+ delta.sql_delete()
+
+ def write_rrdp_file(self, fn, xml, overwrite = False):
"""
- Write current notification file to disk.
+ Save an RRDP XML object to disk.
"""
- serial = self.current_shapshot.serial
- fn = "%s/notification.xml" % self.uuid
-
- xml = Element(rrdp_namespace + "notification",
- version = rrdp_version,
- session_id = uuid,
- serial = serial)
-
- SubElement(xml, rrdp_namespace + "snapshot",
- uri = "%s/%s/snapshot/%d.xml" % (self.rrdp_uri_base, self.uuid, serial),
- hash = um_where_do_we_store_this)
-
- for delta in some_sql_query_here():
- SubElement(xml, rrdp_namespace + "delta",
- from = delta.from_serial,
- to = delta.to_serial,
- uri = delta.uri,
- hash = delta.hash)
-
+ if not overwrite and os.path.exists(os.path.join(self.gctx.rrdp_publication_base, fn)):
+ raise SomeExceptionOrAnother("%s already exists, not regenerating" % fn)
rpki.relaxng.rrdp.assertValid(xml)
- tn = os.path.join(self.rrdp_publication_base, fn + ".%s.tmp" % os.getpid())
+ tn = os.path.join(self.gctx.rrdp_publication_base, fn + ".%s.tmp" % os.getpid())
if not os.path.isdir(os.path.dirname(tn)):
os.makedirs(os.path.dirname(tn))
ElementTree(xml).write(tn)
- os.rename(tn, os.path.join(self.rrdp_publication_base, fn))
+ os.rename(tn, os.path.join(self.gctx.rrdp_publication_base, fn))
-
- def write_snapshot(self):
+ def generate_snapshot(self):
"""
- Write current RRDP snapshot to disk.
+ Generate an XML snapshot of this session.
"""
- serial = self.current_shapshot.serial
- fn = "%s/snapshot/%d.xml" % (self.uuid, serial)
-
- if os.path.exists(os.path.join(self.rrdp_publication_base, fn)):
- logger.warning("Snapshot %s already exists, this is suprising, not regenerating")
- return
-
- xml = Element(rrdp_namespace + "snapshot", version = rrdp_version, session_id = uuid, serial = serial)
+ xml = Element(rrdp_namespace + "snapshot", version = rrdp_version, session_id = self.uuid, serial = self.serial)
+ for obj in self.objects:
+ Base64SubElement(xml, rrdp_namespace + "publish", obj, uri = obj.uri)
+ self.snapshot = ElementToString(xml, pretty_print = True)
+ self.hash = rpki.x509.sha256(self.snapshot).encode("hex")
+ self.sql_mark_dirty()
- for obj in object_obj.sql_fetch_where(self.gctx, "session_id = %s AND withdrawn_snapshot_id IS NULL",
- (self.session_id,)):
- se = SubElement(xml, rrdp_namespace + "publish", uri = obj.uri)
- se.text = "\n" + obj.get_Base64()
- se.tail = "\n"
+ def write_notification(self):
+ """
+ Write current notification file to disk.
+ """
+ xml = Element(rrdp_namespace + "notification",
+ version = rrdp_version,
+ session_id = self.uuid,
+ serial = self.serial)
+ SubElement(xml, rrdp_namespace + "snapshot",
+ uri = "%s/%s/snapshot/%d.xml" % (self.gctx.rrdp_uri_base, self.uuid, self.serial),
+ hash = self.hash)
+ for delta in self.deltas:
+ SubElement(xml, rrdp_namespace + "delta",
+ { "from" : str(delta.serial - 1),
+ "to" : str(delta.serial),
+ "uri" : delta.uri,
+ "hash" : delta.hash })
rpki.relaxng.rrdp.assertValid(xml)
+ self.write_rrdp_file("%s/notification.xml" % self.uuid, xml, overwrite = True)
- tn = os.path.join(self.rrdp_publication_base, fn + ".%s.tmp" % os.getpid())
- if not os.path.isdir(os.path.dirname(tn)):
- os.makedirs(os.path.dirname(tn))
- ElementTree(xml).write(tn)
- os.rename(tn, os.path.join(self.rrdp_publication_base, fn))
-
-class snapshot_obj(rpki.sql.sql_persistent):
+class delta_obj(rpki.sql.sql_persistent):
"""
- An RRDP session snapshot.
+ An RRDP delta.
"""
sql_template = rpki.sql.template(
- "snapshot",
- "snapshot_id",
- ("activated", rpki.sundial.datetime),
- ("expires", rpki.sundial.datetime),
- "session_id")
+ "delta",
+ "delta_id",
+ "session_id",
+ "serial",
+ "xml",
+ "hash",
+ ("expires", rpki.sundial.datetime))
@property
@rpki.sql.cache_reference
def session(self):
return session_obj.sql_fetch(self.gctx, self.session_id)
+ # a) Consider making this __init__() instead, just make session optional.
+ #
+ # b) We need some place to store the changes that go into this delta
+ # while running .publish() and .withdraw(); need not be a field
+ # that goes in SQL, a list that we only use while constructing
+ # would be fine.
+ #
+ # c) This probably means that we shouldn't even try to put this into
+ # SQL until we're done building it, which is probably OK.
+ #
@classmethod
def create(cls, session):
self = cls()
+ session.serial += 1
+ session.sql_mark_dirty()
self.gctx = session.gctx
self.session_id = session.session_id
- self.activated = None
- self.expires = None
- self.sql_store()
+ self.serial = session.serial
+ self.xml = None
+ self.hash = None
+ self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval
+ self.deltas = Element(rrdp_namespace + "deltas",
+ to = str(self.serial),
+ version = rrdp_version,
+ session_id = session.uuid)
+ self.deltas.set("from", str(self.serial - 1))
+ SubElement(self.deltas, rrdp_namespace + "delta", serial = str(self.serial))
return self
- @property
- def serial(self):
- """
- I know that using an SQL ID for any other purpose is usually a bad
- idea, but in this case it has exactly the right properties, and we
- really do want both the autoincrement behavior and the foreign key
- behavior to tie to the snapshot serial numbers. So risk it.
-
- Well, OK, only almost the right properties. auto-increment
- probably does not back up if we ROLLBACK, which could leave gaps
- in the sequence. So may need to rework this, eg, to use a serial
- field in the session object. Ignore the issue until we have the
- rest of this working.
- """
-
- return self.snapshot_id
+ def activate(self):
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+ self.xml = ElementToString(self.deltas, pretty_print = True)
+ self.hash = rpki.x509.sha256(self.xml).encode("hex")
+ del self.deltas
+ self.sql_mark_dirty()
def publish(self, client, obj, uri, hash):
if hash is not None:
self.withdraw(client, uri, hash)
- if object_obj.current_object_at_uri(client, self, uri) is not None:
+ elif object_obj.current_object_at_uri(client, self, uri) is not None:
raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri)
logger.debug("Publishing %s", uri)
- return object_obj.create(client, self, obj, uri)
+ object_obj.create(client, self, obj, uri)
+ se = Base64SubElement(self.deltas[0], rrdp_namespace + "publish", obj, uri = uri)
+ if hash is not None:
+ se.set("hash", hash)
+ rpki.relaxng.rrdp.assertValid(self.deltas)
def withdraw(self, client, uri, hash):
obj = object_obj.current_object_at_uri(client, self, uri)
if obj is None:
raise rpki.exceptions.NoObjectAtURI("No object published at %s" % uri)
if obj.hash != hash:
- raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (%s, %s)" % (uri, obj.hash, hash))
+ raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
logger.debug("Withdrawing %s", uri)
obj.delete(self)
-
+ SubElement(self.deltas[0], rrdp_namespace + "withdraw", uri = uri, hash = hash)
+ rpki.relaxng.rrdp.assertValid(self.deltas)
class object_obj(rpki.sql.sql_persistent):
@@ -378,13 +415,11 @@ class object_obj(rpki.sql.sql_persistent):
"uri",
"hash",
"payload",
- "published_snapshot_id",
- "withdrawn_snapshot_id",
"client_id",
"session_id")
def __repr__(self):
- return rpki.log.log_repr(self, self.uri, self.published_snapshot_id, self.withdrawn_snapshot_id)
+ return rpki.log.log_repr(self, self.uri)
@property
@rpki.sql.cache_reference
@@ -397,27 +432,23 @@ class object_obj(rpki.sql.sql_persistent):
return rpki.publication_control.client_elt.sql_fetch(self.gctx, self.client_id)
@classmethod
- def create(cls, client, snapshot, obj, uri):
+ def create(cls, client, delta, obj, uri):
self = cls()
- self.gctx = snapshot.gctx
+ self.gctx = delta.gctx
self.uri = uri
self.payload = obj
self.hash = rpki.x509.sha256(obj.get_DER()).encode("hex")
- logger.debug("Computed hash %s of %r", self.hash, obj)
- self.published_snapshot_id = snapshot.snapshot_id
- self.withdrawn_snapshot_id = None
- self.session_id = snapshot.session_id
+ logger.debug("Computed hash %s for %r", self.hash, obj)
+ self.session_id = delta.session_id
self.client_id = client.client_id
self.sql_mark_dirty()
return self
- def delete(self, snapshot):
- self.withdrawn_snapshot_id = snapshot.snapshot_id
- #self.sql_mark_dirty()
- self.sql_store()
+ def delete(self, delta):
+ self.sql_mark_deleted()
@classmethod
- def current_object_at_uri(cls, client, snapshot, uri):
+ def current_object_at_uri(cls, client, delta, uri):
return cls.sql_fetch_where1(client.gctx,
- "session_id = %s AND client_id = %s AND withdrawn_snapshot_id IS NULL AND uri = %s",
- (snapshot.session_id, client.client_id, uri))
+ "session_id = %s AND client_id = %s AND uri = %s",
+ (delta.session_id, client.client_id, uri))