aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r--rpki/rpkid.py442
1 files changed, 216 insertions, 226 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index c6b1001e..896fe0be 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -28,20 +28,27 @@ import random
import logging
import argparse
+import tornado.gen
+import tornado.web
+import tornado.ioloop
+import tornado.httputil
+import tornado.httpclient
+import tornado.httpserver
+
+from lxml.etree import Element, SubElement, tostring as ElementToString
+
import rpki.resource_set
import rpki.up_down
import rpki.left_right
import rpki.x509
-import rpki.http
import rpki.config
import rpki.exceptions
import rpki.relaxng
import rpki.log
-import rpki.async
import rpki.daemonize
+
import rpki.rpkid_tasks
-from lxml.etree import Element, SubElement, tostring as ElementToString
logger = logging.getLogger(__name__)
@@ -105,12 +112,10 @@ class main(object):
logger.info("Running in profile mode with output to %s", self.profile)
logger.debug("Initializing Django")
-
import django
django.setup()
logger.debug("Initializing rpkidb...")
-
global rpki # pylint: disable=W0602
import rpki.rpkidb # pylint: disable=W0621
@@ -127,41 +132,46 @@ class main(object):
self.http_server_host = self.cfg.get("server-host", "")
self.http_server_port = self.cfg.getint("server-port")
- self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/")
-
self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10),
self.cfg.getint("initial-delay-max", 120))
# Should be much longer in production
- self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120))
- self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0))
- if not self.cron_keepalive:
- self.cron_keepalive = self.cron_period * 4
- self.cron_timeout = None
+ self.cron_period = self.cfg.getint("cron-period", 120)
- self.start_cron()
+ if self.use_internal_cron:
+ logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay)
+ tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop)
- rpki.http.server(
- host = self.http_server_host,
- port = self.http_server_port,
- handlers = (("/left-right", self.left_right_handler),
- ("/up-down/", self.up_down_handler, rpki.up_down.allowed_content_types),
- ("/cronjob", self.cronjob_handler)))
+ rpkid = self
- def start_cron(self):
- """
- Start clock for rpkid's internal cron process.
- """
+ class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self):
+ yield rpkid.left_right_handler(self)
+
+ class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self, tenant_handle, child_handle):
+ yield rpkid.up_down_handler(self, tenant_handle, child_handle)
+
+ class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self):
+ yield rpkid.cronjob_handler(self)
+
+ application = tornado.web.Application((
+ (r"/left-right", LeftRightHandler),
+ (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler),
+ (r"/cronjob", CronjobHandler)))
+
+ application.listen(
+ address = self.http_server_host,
+ port = self.http_server_port)
+
+ tornado.ioloop.IOLoop.current().start()
- if self.use_internal_cron:
- self.cron_timer = rpki.async.timer(handler = self.cron)
- when = rpki.sundial.now() + rpki.sundial.timedelta(seconds = self.initial_delay)
- logger.debug("Scheduling initial cron pass at %s", when)
- self.cron_timer.set(when)
- else:
- logger.debug("Not using internal clock, start_cron() call ignored")
@staticmethod
def _compose_left_right_query():
@@ -172,70 +182,86 @@ class main(object):
return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "query", version = rpki.left_right.version)
- def irdb_query(self, q_msg, callback, errback):
+
+ @tornado.gen.coroutine
+ def irdb_query(self, q_msg):
"""
Perform an IRDB callback query.
"""
- try:
- q_tags = set(q_pdu.tag for q_pdu in q_msg)
+ q_tags = set(q_pdu.tag for q_pdu in q_msg)
- q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
+ q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
- def unwrap(r_der):
- try:
- r_cms = rpki.left_right.cms_msg(DER = r_der)
- r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
- self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
- #rpki.left_right.check_response(r_msg)
- if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
- raise rpki.exceptions.BadIRDBReply(
- "Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
- callback(r_msg)
- except Exception, e:
- errback(e)
+ http_client = tornado.httpclient.AsyncHTTPClient()
- rpki.http.client(
- url = self.irdb_url,
- msg = q_der,
- callback = unwrap,
- errback = errback)
+ http_request = tornado.httpclient.HTTPRequest(
+ url = self.irdb_url,
+ method = "POST",
+ body = q_der,
+ headers = { "Content-Type" : rpki.left_right.content_type })
- except Exception, e:
- errback(e)
+ http_response = yield http_client.fetch(http_request)
+
+ # Tornado already checked http_response.code for us
+
+ content_type = http_response.headers.get("Content-Type")
+
+ if content_type not in rpki.left_right.allowed_content_types:
+ raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type))
+
+ r_der = http_response.body
+
+ r_cms = rpki.left_right.cms_msg(DER = r_der)
+ r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
+ self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
- def irdb_query_child_resources(self, tenant_handle, child_handle, callback, errback):
+ #rpki.left_right.check_response(r_msg)
+
+ if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
+ raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
+
+ raise tornado.gen.Return(r_msg)
+
+
+ @tornado.gen.coroutine
+ def irdb_query_child_resources(self, tenant_handle, child_handle):
"""
Ask IRDB about a child's resources.
"""
q_msg = self._compose_left_right_query()
- SubElement(q_msg, rpki.left_right.tag_list_resources,
- tenant_handle = tenant_handle, child_handle = child_handle)
+ SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle)
+
+ r_msg = yield self.irdb_query(q_msg)
- def done(r_msg):
- if len(r_msg) != 1:
- raise rpki.exceptions.BadIRDBReply(
- "Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
- callback(rpki.resource_set.resource_bag(
- asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
- v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
- v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
- valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))))
+ if len(r_msg) != 1:
+ raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
- self.irdb_query(q_msg, done, errback)
+ bag = rpki.resource_set.resource_bag(
+ asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
+ v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
+ v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
+ valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))
- def irdb_query_roa_requests(self, tenant_handle, callback, errback):
+ raise tornado.gen.Return(bag)
+
+
+ @tornado.gen.coroutine
+ def irdb_query_roa_requests(self, tenant_handle):
"""
Ask IRDB about self's ROA requests.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle)
- self.irdb_query(q_msg, callback, errback)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
+
- def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles, callback, errback):
+ @tornado.gen.coroutine
+ def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles):
"""
Ask IRDB about self's ghostbuster record requests.
"""
@@ -244,16 +270,20 @@ class main(object):
for parent_handle in parent_handles:
SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests,
tenant_handle = tenant_handle, parent_handle = parent_handle)
- self.irdb_query(q_msg, callback, errback)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
- def irdb_query_ee_certificate_requests(self, tenant_handle, callback, errback):
+ @tornado.gen.coroutine
+ def irdb_query_ee_certificate_requests(self, tenant_handle):
"""
Ask IRDB about self's EE certificate requests.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle)
- self.irdb_query(q_msg, callback, errback)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
+
@property
def left_right_models(self):
@@ -273,6 +303,7 @@ class main(object):
rpki.left_right.tag_repository : rpki.rpkidb.models.Repository }
return self._left_right_models
+
@property
def left_right_trivial_handlers(self):
"""
@@ -287,6 +318,7 @@ class main(object):
rpki.left_right.tag_list_received_resources : self.handle_list_received_resources }
return self._left_right_trivial_handlers
+
def handle_list_published_objects(self, q_pdu, r_msg):
"""
<list_published_objects/> server.
@@ -317,6 +349,7 @@ class main(object):
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = c.uri, **kw).text = c.cert.get_Base64()
+
def handle_list_received_resources(self, q_pdu, r_msg):
"""
<list_received_resources/> server.
@@ -344,28 +377,28 @@ class main(object):
r_pdu.set("tag", msg_tag)
- def left_right_handler(self, query, path, cb):
+ @tornado.gen.coroutine
+ def left_right_handler(self, handler):
"""
- Process one left-right PDU.
+ Process one left-right message.
"""
- # This handles five persistent classes (self, bsc, parent, child,
- # repository) and two simple queries (list_published_objects and
- # list_received_resources). The former probably need to dispatch
- # via methods to the corresponding model classes; the latter
- # probably just become calls to ordinary methods of this
- # (rpki.rpkid.main) class.
- #
- # Need to clone logic from rpki.pubd.main.control_handler().
-
logger.debug("Entering left_right_handler()")
+ content_type = handler.request.headers["Content-Type"]
+ if content_type not in rpki.left_right.allowed_content_types:
+ handler.set_status(415, "No handler for Content-Type %s" % content_type)
+ handler.finish()
+ raise tornado.gen.Return
+
+ handler.set_header("Content-Type", rpki.left_right.content_type)
+
try:
- q_cms = rpki.left_right.cms_msg(DER = query)
+ q_cms = rpki.left_right.cms_msg(DER = handler.request.body)
q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "reply", version = rpki.left_right.version)
- self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path)
+ self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path)
assert q_msg.tag.startswith(rpki.left_right.xmlns)
assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg)
@@ -376,14 +409,37 @@ class main(object):
if q_msg.get("type") != "query":
raise rpki.exceptions.BadQuery("Message type is not query")
- def done():
- cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ for q_pdu in q_msg:
- def loop(iterator, q_pdu):
+ try:
+ action = q_pdu.get("action")
+ model = self.left_right_models.get(q_pdu.tag)
+
+ if q_pdu.tag in self.left_right_trivial_handlers:
+ self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
+
+ elif action in ("get", "list"):
+ for obj in model.objects.xml_list(q_pdu):
+ obj.xml_template.encode(obj, q_pdu, r_msg)
+
+ elif action == "destroy":
+ obj = model.objects.xml_get_for_delete(q_pdu)
+ yield obj.xml_pre_delete_hook(self)
+ obj.delete()
+ obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- logger.debug("left_right_handler():loop(%r)", q_pdu)
+ elif action in ("create", "set"):
+ obj = model.objects.xml_get_or_create(q_pdu)
+ obj.xml_template.decode(obj, q_pdu)
+ obj.xml_pre_save_hook(q_pdu)
+ obj.save()
+ yield obj.xml_post_save_hook(self, q_pdu)
+ obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- def fail(e):
+ else:
+ raise rpki.exceptions.BadQuery("Unrecognized action %r" % action)
+
+ except Exception, e:
if not isinstance(e, rpki.exceptions.NotFound):
logger.exception("Unhandled exception serving left-right PDU %r", q_pdu)
error_tenant_handle = q_pdu.get("tenant_handle")
@@ -394,102 +450,50 @@ class main(object):
r_pdu.set("tag", error_tag)
if error_tenant_handle is not None:
r_pdu.set("tenant_handle", error_tenant_handle)
- cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ break
- try:
- if q_pdu.tag in self.left_right_trivial_handlers:
- logger.debug("left_right_handler(): trivial handler")
- self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
- iterator()
-
- else:
- action = q_pdu.get("action")
- model = self.left_right_models[q_pdu.tag]
-
- logger.debug("left_right_handler(): action %s model %r", action, model)
-
- if action in ("get", "list"):
- logger.debug("left_right_handler(): get/list")
- for obj in model.objects.xml_list(q_pdu):
- logger.debug("left_right_handler(): get/list: encoding %r", obj)
- obj.xml_template.encode(obj, q_pdu, r_msg)
- iterator()
-
- elif action == "destroy":
- def destroy_cb():
- obj.delete()
- obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- iterator()
- logger.debug("left_right_handler(): destroy")
- obj = model.objects.xml_get_for_delete(q_pdu)
- obj.xml_pre_delete_hook(self, destroy_cb, fail)
-
- elif action in ("create", "set"):
- def create_set_cb():
- obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- iterator()
- logger.debug("left_right_handler(): create/set")
- obj = model.objects.xml_get_or_create(q_pdu)
- obj.xml_template.decode(obj, q_pdu)
- obj.xml_pre_save_hook(q_pdu)
- obj.save()
- obj.xml_post_save_hook(self, q_pdu, create_set_cb, fail)
-
- else:
- raise rpki.exceptions.BadQuery
-
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- fail(e)
-
- rpki.async.iterator(q_msg, loop, done)
-
- except (rpki.async.ExitNow, SystemExit):
- raise
+ handler.set_status(200)
+ handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ logger.debug("Normal exit from left_right_handler()")
except Exception, e:
logger.exception("Unhandled exception serving left-right request")
- cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+ handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+ handler.finish()
- up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I)
- def up_down_handler(self, q_der, path, cb):
+ @tornado.gen.coroutine
+ def up_down_handler(self, handler, tenant_handle, child_handle):
"""
Process one up-down PDU.
"""
- def done(r_der):
- cb(200, body = r_der)
+ logger.debug("Entering up_down_handler()")
+
+ content_type = handler.request.headers["Content-Type"]
+ if content_type not in rpki.up_down.allowed_content_types:
+ handler.set_status(415, "No handler for Content-Type %s" % content_type)
+ handler.finish()
+ raise tornado.gen.Return
try:
- match = self.up_down_url_regexp.search(path)
- if match is None:
- raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path)
- tenant_handle, child_handle = match.groups()
- try:
- child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
- except rpki.rpkidb.models.Child.DoesNotExist:
- raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % (
- child_handle, tenant_handle))
- child.serve_up_down(self, q_der, done)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e:
- logger.warning(str(e))
- cb(400, reason = str(e))
+ child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
+ q_der = handler.request.body
+ r_der = yield child.serve_up_down(self, q_der)
+ handler.set_header("Content-Type", rpki.up_down.content_type)
+ handler.set_status(200)
+ handler.finish(r_der)
+
+ except rpki.rpkidb.models.Child.DoesNotExist:
+ logger.info("Child %r of tenant %r not found", child_handle, tenant_handle)
+ handler.set_status(400, "Child %r not found" % child_handle)
+ handler.finish()
+
except Exception, e:
logger.exception("Unhandled exception processing up-down request")
- cb(400, reason = "Could not process PDU: %s" % e)
-
- def checkpoint(self, force = False):
- """
- Record that we were still alive when we got here, by resetting
- keepalive timer.
- """
+ handler.set_status(400, "Could not process PDU: %s" % e)
+ handler.finish()
- if force or self.cron_timeout is not None:
- self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
def task_add(self, task):
"""
@@ -504,11 +508,10 @@ class main(object):
logger.debug("Task %r was already in the task queue", task)
return False
+
def task_next(self):
"""
- Pull next task from the task queue and put it the deferred event
- queue (we don't want to run it directly, as that could eventually
- blow out our call stack).
+ Schedule next task in the queue to be run.
"""
try:
@@ -516,77 +519,67 @@ class main(object):
except IndexError:
self.task_current = None
else:
- rpki.async.event_defer(self.task_current)
+ tornado.ioloop.IOLoop.current().add_callback(self.task_current)
+
def task_run(self):
"""
- Run first task on the task queue, unless one is running already.
+ Schedule first queued task unless a task is running already.
"""
if self.task_current is None:
self.task_next()
- def cron(self, cb = None):
+
+ @tornado.gen.coroutine
+ def cron_loop(self):
"""
- Periodic tasks.
+ Asynchronous infinite loop to drive cron cycle.
"""
- now = rpki.sundial.now()
+ assert self.use_internal_cron
+ yield tornado.gen.sleep(self.initial_delay)
+ while True:
+ yield self.cron_run()
+ yield tornado.gen.sleep(self.cron_period)
- logger.debug("Starting cron run")
- def done():
- self.cron_timeout = None
- logger.info("Finished cron run started at %s", now)
- if cb is not None:
- cb()
+ @tornado.gen.coroutine
+ def cron_run(self):
+ """
+ Periodic tasks.
+ """
- completion = rpki.rpkid_tasks.CompletionHandler(done)
+ now = rpki.sundial.now()
+ logger.debug("Starting cron run")
+ futures = []
try:
- selves = rpki.rpkidb.models.Tenant.objects.all()
- except Exception:
- logger.exception("Error pulling selves from SQL, maybe SQL server is down?")
+ tenants = rpki.rpkidb.models.Tenant.objects.all()
+ except:
+ logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
else:
- for s in selves:
- s.schedule_cron_tasks(self, completion)
- nothing_queued = completion.count == 0
-
- assert self.use_internal_cron or self.cron_timeout is None
-
- if self.cron_timeout is not None and self.cron_timeout < now:
- logger.warning("cron keepalive threshold %s has expired, breaking lock", self.cron_timeout)
- self.cron_timeout = None
-
- if self.use_internal_cron:
- when = now + self.cron_period
- logger.debug("Scheduling next cron run at %s", when)
- self.cron_timer.set(when)
-
- if self.cron_timeout is None:
- self.checkpoint(self.use_internal_cron)
- self.task_run()
-
- elif self.use_internal_cron:
- logger.warning("cron already running, keepalive will expire at %s", self.cron_timeout)
+ for tenant in tenants:
+ futures.extend(condition.wait() for condition in tenant.schedule_cron_tasks(self))
+ if futures:
+ yield futures
+ logger.info("Finished cron run started at %s", now)
- if nothing_queued:
- done()
- def cronjob_handler(self, query, path, cb):
+ @tornado.gen.coroutine
+ def cronjob_handler(self, handler):
"""
External trigger for periodic tasks. This is somewhat obsolete
now that we have internal timers, but the test framework still
uses it.
"""
- def done():
- cb(200, body = "OK")
-
if self.use_internal_cron:
- cb(500, reason = "Running cron internally")
+ handler.set_status(500, "Running cron internally")
else:
logger.debug("Starting externally triggered cron")
- self.cron(done)
+ yield self.cron()
+ handler.set_status(200)
+ handler.finish()
class publication_queue(object):
@@ -661,19 +654,16 @@ class publication_queue(object):
if self.replace:
self.uris[uri] = pdu
- def call_pubd(self, cb, eb):
- def loop(iterator, rid):
+ @tornado.gen.coroutine
+ def call_pubd(self):
+ for rid in self.repositories:
logger.debug("Calling pubd[%r]", self.repositories[rid])
- self.repositories[rid].call_pubd(self.rpkid, iterator, eb, self.msgs[rid], self.handlers)
- def done():
- self.clear()
- cb()
- rpki.async.iterator(self.repositories, loop, done)
+ yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers)
+ self.clear()
@property
def size(self):
return sum(len(self.msgs[rid]) for rid in self.repositories)
def empty(self):
- assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size)
return not self.msgs