aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r--rpki/rpkid.py1218
1 files changed, 609 insertions, 609 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index da6141ea..c0ddbd58 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -56,681 +56,681 @@ logger = logging.getLogger(__name__)
class main(object):
- """
- Main program for rpkid.
- """
-
- def __init__(self):
-
- os.environ.update(TZ = "UTC",
- DJANGO_SETTINGS_MODULE = "rpki.django_settings.rpkid")
- time.tzset()
-
- self.irdbd_cms_timestamp = None
- self.irbe_cms_timestamp = None
-
- self.task_queue = []
- self.task_event = tornado.locks.Event()
-
- self.http_client_serialize = weakref.WeakValueDictionary()
-
- parser = argparse.ArgumentParser(description = __doc__)
- parser.add_argument("-c", "--config",
- help = "override default location of configuration file")
- parser.add_argument("-f", "--foreground", action = "store_true",
- help = "do not daemonize")
- parser.add_argument("--pidfile",
- help = "override default location of pid file")
- parser.add_argument("--profile",
- help = "enable profiling, saving data to PROFILE")
- rpki.log.argparse_setup(parser)
- args = parser.parse_args()
-
- self.profile = args.profile
-
- rpki.log.init("rpkid", args)
-
- self.cfg = rpki.config.parser(set_filename = args.config, section = "rpkid")
- self.cfg.set_global_flags()
-
- if not args.foreground:
- rpki.daemonize.daemon(pidfile = args.pidfile)
-
- if self.profile:
- import cProfile
- prof = cProfile.Profile()
- try:
- prof.runcall(self.main)
- finally:
- prof.dump_stats(self.profile)
- logger.info("Dumped profile data to %s", self.profile)
- else:
- self.main()
-
- def main(self):
-
- startup_msg = self.cfg.get("startup-message", "")
- if startup_msg:
- logger.info(startup_msg)
-
- if self.profile:
- logger.info("Running in profile mode with output to %s", self.profile)
-
- logger.debug("Initializing Django")
- import django
- django.setup()
-
- logger.debug("Initializing rpkidb...")
- global rpki # pylint: disable=W0602
- import rpki.rpkidb # pylint: disable=W0621
-
- logger.debug("Initializing rpkidb...done")
-
- self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta"))
- self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert"))
- self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert"))
- self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert"))
- self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key"))
-
- self.irdb_url = self.cfg.get("irdb-url")
-
- self.http_server_host = self.cfg.get("server-host", "")
- self.http_server_port = self.cfg.getint("server-port")
-
- self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
-
- self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10),
- self.cfg.getint("initial-delay-max", 120))
-
- # Should be much longer in production
- self.cron_period = self.cfg.getint("cron-period", 120)
-
- if self.use_internal_cron:
- logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay)
- tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop)
-
- logger.debug("Scheduling task loop")
- tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop)
-
- rpkid = self
-
- class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223
- @tornado.gen.coroutine
- def post(self):
- yield rpkid.left_right_handler(self)
-
- class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223
- @tornado.gen.coroutine
- def post(self, tenant_handle, child_handle): # pylint: disable=W0221
- yield rpkid.up_down_handler(self, tenant_handle, child_handle)
-
- class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223
- @tornado.gen.coroutine
- def post(self):
- yield rpkid.cronjob_handler(self)
-
- application = tornado.web.Application((
- (r"/left-right", LeftRightHandler),
- (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler),
- (r"/cronjob", CronjobHandler)))
-
- application.listen(
- address = self.http_server_host,
- port = self.http_server_port)
-
- tornado.ioloop.IOLoop.current().start()
-
- def task_add(self, tasks):
"""
- Add zero or more tasks to the task queue.
+ Main program for rpkid.
"""
- for task in tasks:
- if task in self.task_queue:
- logger.debug("Task %r already queued", task)
- else:
- logger.debug("Adding %r to task queue", task)
- self.task_queue.append(task)
+ def __init__(self):
- def task_run(self):
- """
- Kick the task loop to notice recently added tasks.
- """
+ os.environ.update(TZ = "UTC",
+ DJANGO_SETTINGS_MODULE = "rpki.django_settings.rpkid")
+ time.tzset()
- self.task_event.set()
+ self.irdbd_cms_timestamp = None
+ self.irbe_cms_timestamp = None
- @tornado.gen.coroutine
- def task_loop(self):
- """
- Asynchronous infinite loop to run background tasks.
+ self.task_queue = []
+ self.task_event = tornado.locks.Event()
- This code is a bit finicky, because it's managing a collection of
- Future objects which are running independently of the control flow
- here, and the wave function doesn't collapse until we do a yield.
+ self.http_client_serialize = weakref.WeakValueDictionary()
- So we keep this brutally simple and don't try to hide too much of
- it in the AbstractTask class. For similar reasons, AbstractTask
- sets aside a .future instance variable for this method's use.
- """
+ parser = argparse.ArgumentParser(description = __doc__)
+ parser.add_argument("-c", "--config",
+ help = "override default location of configuration file")
+ parser.add_argument("-f", "--foreground", action = "store_true",
+ help = "do not daemonize")
+ parser.add_argument("--pidfile",
+ help = "override default location of pid file")
+ parser.add_argument("--profile",
+ help = "enable profiling, saving data to PROFILE")
+ rpki.log.argparse_setup(parser)
+ args = parser.parse_args()
- logger.debug("Starting task loop")
- task_event_future = None
-
- while True:
- while None in self.task_queue:
- self.task_queue.remove(None)
-
- futures = []
- for task in self.task_queue:
- if task.future is None:
- task.future = task.start()
- futures.append(task.future)
- if task_event_future is None:
- task_event_future = self.task_event.wait()
- futures.append(task_event_future)
- iterator = tornado.gen.WaitIterator(*futures)
-
- while not iterator.done():
- yield iterator.next()
- if iterator.current_future is task_event_future:
- self.task_event.clear()
- task_event_future = None
- break
- else:
- task = self.task_queue[iterator.current_index]
- task.future = None
- waiting = task.waiting()
- if not waiting:
- self.task_queue[iterator.current_index] = None
- for task in self.task_queue:
- if task is not None and not task.runnable.is_set():
- logger.debug("Reenabling task %r", task)
- task.runnable.set()
- if waiting:
- break
-
- @tornado.gen.coroutine
- def cron_loop(self):
- """
- Asynchronous infinite loop to drive cron cycle.
- """
+ self.profile = args.profile
- logger.debug("cron_loop(): Starting")
- assert self.use_internal_cron
- logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay)
- yield tornado.gen.sleep(self.initial_delay)
- while True:
- logger.debug("cron_loop(): Running")
- yield self.cron_run()
- logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period)
- yield tornado.gen.sleep(self.cron_period)
-
- @tornado.gen.coroutine
- def cron_run(self):
- """
- Schedule periodic tasks.
- """
+ rpki.log.init("rpkid", args)
- now = rpki.sundial.now()
- logger.debug("Starting cron run")
- try:
- tenants = rpki.rpkidb.models.Tenant.objects.all()
- except:
- logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
- else:
- tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self))
- self.task_add(tasks)
- futures = [task.wait() for task in tasks]
- self.task_run()
- yield futures
- logger.info("Finished cron run started at %s", now)
-
- @tornado.gen.coroutine
- def cronjob_handler(self, handler):
- """
- External trigger to schedule periodic tasks. Obsolete for
- produciton use, but portions of the test framework still use this.
- """
+ self.cfg = rpki.config.parser(set_filename = args.config, section = "rpkid")
+ self.cfg.set_global_flags()
- if self.use_internal_cron:
- handler.set_status(500, "Running cron internally")
- else:
- logger.debug("Starting externally triggered cron")
- yield self.cron()
- handler.set_status(200)
- handler.finish()
+ if not args.foreground:
+ rpki.daemonize.daemon(pidfile = args.pidfile)
- @tornado.gen.coroutine
- def http_fetch(self, request, serialize_on_full_url = False):
- """
- Wrapper around tornado.httpclient.AsyncHTTPClient() which
- serializes requests to any particular HTTP server, to avoid
- spurious CMS replay errors.
- """
+ if self.profile:
+ import cProfile
+ prof = cProfile.Profile()
+ try:
+ prof.runcall(self.main)
+ finally:
+ prof.dump_stats(self.profile)
+ logger.info("Dumped profile data to %s", self.profile)
+ else:
+ self.main()
- # The current definition of "particular HTTP server" is based only
- # on the "netloc" portion of the URL, which could in theory could
- # cause deadlocks in a loopback scenario; no such deadlocks have
- # shown up in testing, but if such a thing were to occur, it would
- # look like an otherwise inexplicable HTTP timeout. The solution,
- # should this occur, would be to use the entire URL as the lookup
- # key, perhaps only for certain protocols.
- #
- # The reason for the current scheme is that at least one protocol
- # (publication) uses RESTful URLs but has a single service-wide
- # CMS replay detection database, which translates to meaning that
- # we need to serialize all requests for that service, not just
- # requests to a particular URL.
-
- if serialize_on_full_url:
- netlock = request.url
- else:
- netlock = urlparse.urlparse(request.url).netloc
-
- try:
- lock = self.http_client_serialize[netlock]
- except KeyError:
- lock = self.http_client_serialize[netlock] = tornado.locks.Lock()
-
- http_client = tornado.httpclient.AsyncHTTPClient()
-
- with (yield lock.acquire()):
- response = yield http_client.fetch(request)
-
- raise tornado.gen.Return(response)
-
- @staticmethod
- def _compose_left_right_query():
- """
- Compose top level element of a left-right query to irdbd.
- """
+ def main(self):
- return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
- type = "query", version = rpki.left_right.version)
+ startup_msg = self.cfg.get("startup-message", "")
+ if startup_msg:
+ logger.info(startup_msg)
- @tornado.gen.coroutine
- def irdb_query(self, q_msg):
- """
- Perform an IRDB callback query.
- """
+ if self.profile:
+ logger.info("Running in profile mode with output to %s", self.profile)
- q_tags = set(q_pdu.tag for q_pdu in q_msg)
+ logger.debug("Initializing Django")
+ import django
+ django.setup()
- q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
+ logger.debug("Initializing rpkidb...")
+ global rpki # pylint: disable=W0602
+ import rpki.rpkidb # pylint: disable=W0621
- http_request = tornado.httpclient.HTTPRequest(
- url = self.irdb_url,
- method = "POST",
- body = q_der,
- headers = { "Content-Type" : rpki.left_right.content_type })
+ logger.debug("Initializing rpkidb...done")
- http_response = yield self.http_fetch(http_request)
+ self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta"))
+ self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert"))
+ self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert"))
+ self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert"))
+ self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key"))
- # Tornado already checked http_response.code for us
+ self.irdb_url = self.cfg.get("irdb-url")
- content_type = http_response.headers.get("Content-Type")
+ self.http_server_host = self.cfg.get("server-host", "")
+ self.http_server_port = self.cfg.getint("server-port")
+
+ self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
+
+ self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10),
+ self.cfg.getint("initial-delay-max", 120))
+
+ # Should be much longer in production
+ self.cron_period = self.cfg.getint("cron-period", 120)
+
+ if self.use_internal_cron:
+ logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay)
+ tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop)
+
+ logger.debug("Scheduling task loop")
+ tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop)
+
+ rpkid = self
+
+ class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self):
+ yield rpkid.left_right_handler(self)
+
+ class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self, tenant_handle, child_handle): # pylint: disable=W0221
+ yield rpkid.up_down_handler(self, tenant_handle, child_handle)
+
+ class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self):
+ yield rpkid.cronjob_handler(self)
+
+ application = tornado.web.Application((
+ (r"/left-right", LeftRightHandler),
+ (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler),
+ (r"/cronjob", CronjobHandler)))
+
+ application.listen(
+ address = self.http_server_host,
+ port = self.http_server_port)
+
+ tornado.ioloop.IOLoop.current().start()
+
+ def task_add(self, tasks):
+ """
+ Add zero or more tasks to the task queue.
+ """
+
+ for task in tasks:
+ if task in self.task_queue:
+ logger.debug("Task %r already queued", task)
+ else:
+ logger.debug("Adding %r to task queue", task)
+ self.task_queue.append(task)
+
+ def task_run(self):
+ """
+ Kick the task loop to notice recently added tasks.
+ """
+
+ self.task_event.set()
+
+ @tornado.gen.coroutine
+ def task_loop(self):
+ """
+ Asynchronous infinite loop to run background tasks.
+
+ This code is a bit finicky, because it's managing a collection of
+ Future objects which are running independently of the control flow
+ here, and the wave function doesn't collapse until we do a yield.
+
+ So we keep this brutally simple and don't try to hide too much of
+ it in the AbstractTask class. For similar reasons, AbstractTask
+ sets aside a .future instance variable for this method's use.
+ """
+
+ logger.debug("Starting task loop")
+ task_event_future = None
+
+ while True:
+ while None in self.task_queue:
+ self.task_queue.remove(None)
+
+ futures = []
+ for task in self.task_queue:
+ if task.future is None:
+ task.future = task.start()
+ futures.append(task.future)
+ if task_event_future is None:
+ task_event_future = self.task_event.wait()
+ futures.append(task_event_future)
+ iterator = tornado.gen.WaitIterator(*futures)
+
+ while not iterator.done():
+ yield iterator.next()
+ if iterator.current_future is task_event_future:
+ self.task_event.clear()
+ task_event_future = None
+ break
+ else:
+ task = self.task_queue[iterator.current_index]
+ task.future = None
+ waiting = task.waiting()
+ if not waiting:
+ self.task_queue[iterator.current_index] = None
+ for task in self.task_queue:
+ if task is not None and not task.runnable.is_set():
+ logger.debug("Reenabling task %r", task)
+ task.runnable.set()
+ if waiting:
+ break
+
+ @tornado.gen.coroutine
+ def cron_loop(self):
+ """
+ Asynchronous infinite loop to drive cron cycle.
+ """
+
+ logger.debug("cron_loop(): Starting")
+ assert self.use_internal_cron
+ logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay)
+ yield tornado.gen.sleep(self.initial_delay)
+ while True:
+ logger.debug("cron_loop(): Running")
+ yield self.cron_run()
+ logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period)
+ yield tornado.gen.sleep(self.cron_period)
+
+ @tornado.gen.coroutine
+ def cron_run(self):
+ """
+ Schedule periodic tasks.
+ """
+
+ now = rpki.sundial.now()
+ logger.debug("Starting cron run")
+ try:
+ tenants = rpki.rpkidb.models.Tenant.objects.all()
+ except:
+ logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
+ else:
+ tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self))
+ self.task_add(tasks)
+ futures = [task.wait() for task in tasks]
+ self.task_run()
+ yield futures
+ logger.info("Finished cron run started at %s", now)
+
+ @tornado.gen.coroutine
+ def cronjob_handler(self, handler):
+ """
+ External trigger to schedule periodic tasks. Obsolete for
+ produciton use, but portions of the test framework still use this.
+ """
+
+ if self.use_internal_cron:
+ handler.set_status(500, "Running cron internally")
+ else:
+ logger.debug("Starting externally triggered cron")
+ yield self.cron()
+ handler.set_status(200)
+ handler.finish()
+
+ @tornado.gen.coroutine
+ def http_fetch(self, request, serialize_on_full_url = False):
+ """
+ Wrapper around tornado.httpclient.AsyncHTTPClient() which
+ serializes requests to any particular HTTP server, to avoid
+ spurious CMS replay errors.
+ """
+
+ # The current definition of "particular HTTP server" is based only
+ # on the "netloc" portion of the URL, which could in theory could
+ # cause deadlocks in a loopback scenario; no such deadlocks have
+ # shown up in testing, but if such a thing were to occur, it would
+ # look like an otherwise inexplicable HTTP timeout. The solution,
+ # should this occur, would be to use the entire URL as the lookup
+ # key, perhaps only for certain protocols.
+ #
+ # The reason for the current scheme is that at least one protocol
+ # (publication) uses RESTful URLs but has a single service-wide
+ # CMS replay detection database, which translates to meaning that
+ # we need to serialize all requests for that service, not just
+ # requests to a particular URL.
+
+ if serialize_on_full_url:
+ netlock = request.url
+ else:
+ netlock = urlparse.urlparse(request.url).netloc
- if content_type not in rpki.left_right.allowed_content_types:
- raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type))
+ try:
+ lock = self.http_client_serialize[netlock]
+ except KeyError:
+ lock = self.http_client_serialize[netlock] = tornado.locks.Lock()
- r_der = http_response.body
+ http_client = tornado.httpclient.AsyncHTTPClient()
- r_cms = rpki.left_right.cms_msg(DER = r_der)
- r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
+ with (yield lock.acquire()):
+ response = yield http_client.fetch(request)
- self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
+ raise tornado.gen.Return(response)
- #rpki.left_right.check_response(r_msg)
+ @staticmethod
+ def _compose_left_right_query():
+ """
+ Compose top level element of a left-right query to irdbd.
+ """
- if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
- raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
+ return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
+ type = "query", version = rpki.left_right.version)
- raise tornado.gen.Return(r_msg)
+ @tornado.gen.coroutine
+ def irdb_query(self, q_msg):
+ """
+ Perform an IRDB callback query.
+ """
- @tornado.gen.coroutine
- def irdb_query_child_resources(self, tenant_handle, child_handle):
- """
- Ask IRDB about a child's resources.
- """
+ q_tags = set(q_pdu.tag for q_pdu in q_msg)
- q_msg = self._compose_left_right_query()
- SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle)
+ q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
- r_msg = yield self.irdb_query(q_msg)
+ http_request = tornado.httpclient.HTTPRequest(
+ url = self.irdb_url,
+ method = "POST",
+ body = q_der,
+ headers = { "Content-Type" : rpki.left_right.content_type })
- if len(r_msg) != 1:
- raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
+ http_response = yield self.http_fetch(http_request)
- bag = rpki.resource_set.resource_bag(
- asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
- v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
- v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
- valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))
+ # Tornado already checked http_response.code for us
- raise tornado.gen.Return(bag)
+ content_type = http_response.headers.get("Content-Type")
- @tornado.gen.coroutine
- def irdb_query_roa_requests(self, tenant_handle):
- """
- Ask IRDB about self's ROA requests.
- """
+ if content_type not in rpki.left_right.allowed_content_types:
+ raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type))
- q_msg = self._compose_left_right_query()
- SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle)
- r_msg = yield self.irdb_query(q_msg)
- raise tornado.gen.Return(r_msg)
+ r_der = http_response.body
- @tornado.gen.coroutine
- def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles):
- """
- Ask IRDB about self's ghostbuster record requests.
- """
+ r_cms = rpki.left_right.cms_msg(DER = r_der)
+ r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
- q_msg = self._compose_left_right_query()
- for parent_handle in parent_handles:
- SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests,
- tenant_handle = tenant_handle, parent_handle = parent_handle)
- r_msg = yield self.irdb_query(q_msg)
- raise tornado.gen.Return(r_msg)
+ self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
- @tornado.gen.coroutine
- def irdb_query_ee_certificate_requests(self, tenant_handle):
- """
- Ask IRDB about self's EE certificate requests.
- """
+ #rpki.left_right.check_response(r_msg)
- q_msg = self._compose_left_right_query()
- SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle)
- r_msg = yield self.irdb_query(q_msg)
- raise tornado.gen.Return(r_msg)
+ if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
+ raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
- @property
- def left_right_models(self):
- """
- Map element tag to rpkidb model.
- """
+ raise tornado.gen.Return(r_msg)
- try:
- return self._left_right_models
- except AttributeError:
- import rpki.rpkidb.models # pylint: disable=W0621
- self._left_right_models = {
- rpki.left_right.tag_tenant : rpki.rpkidb.models.Tenant,
- rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC,
- rpki.left_right.tag_parent : rpki.rpkidb.models.Parent,
- rpki.left_right.tag_child : rpki.rpkidb.models.Child,
- rpki.left_right.tag_repository : rpki.rpkidb.models.Repository }
- return self._left_right_models
-
- @property
- def left_right_trivial_handlers(self):
- """
- Map element tag to bound handler methods for trivial PDU types.
- """
+ @tornado.gen.coroutine
+ def irdb_query_child_resources(self, tenant_handle, child_handle):
+ """
+ Ask IRDB about a child's resources.
+ """
- try:
- return self._left_right_trivial_handlers
- except AttributeError:
- self._left_right_trivial_handlers = {
- rpki.left_right.tag_list_published_objects : self.handle_list_published_objects,
- rpki.left_right.tag_list_received_resources : self.handle_list_received_resources }
- return self._left_right_trivial_handlers
+ q_msg = self._compose_left_right_query()
+ SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle)
- def handle_list_published_objects(self, q_pdu, r_msg):
- """
- <list_published_objects/> server.
- """
+ r_msg = yield self.irdb_query(q_msg)
- tenant_handle = q_pdu.get("tenant_handle")
- msg_tag = q_pdu.get("tag")
-
- kw = dict(tenant_handle = tenant_handle)
- if msg_tag is not None:
- kw.update(tag = msg_tag)
-
- for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, state = "active"):
- SubElement(r_msg, rpki.left_right.tag_list_published_objects,
- uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64()
- SubElement(r_msg, rpki.left_right.tag_list_published_objects,
- uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64()
- for c in ca_detail.child_certs.all():
- SubElement(r_msg, rpki.left_right.tag_list_published_objects,
- uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64()
- for r in ca_detail.roas.filter(roa__isnull = False):
- SubElement(r_msg, rpki.left_right.tag_list_published_objects,
- uri = r.uri, **kw).text = r.roa.get_Base64()
- for g in ca_detail.ghostbusters.all():
- SubElement(r_msg, rpki.left_right.tag_list_published_objects,
- uri = g.uri, **kw).text = g.ghostbuster.get_Base64()
- for c in ca_detail.ee_certificates.all():
- SubElement(r_msg, rpki.left_right.tag_list_published_objects,
- uri = c.uri, **kw).text = c.cert.get_Base64()
-
- def handle_list_received_resources(self, q_pdu, r_msg):
- """
- <list_received_resources/> server.
- """
+ if len(r_msg) != 1:
+ raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
- logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu))
- tenant_handle = q_pdu.get("tenant_handle")
- msg_tag = q_pdu.get("tag")
- for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle,
- state = "active", latest_ca_cert__isnull = False):
- cert = ca_detail.latest_ca_cert
- resources = cert.get_3779resources()
- r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources,
- tenant_handle = tenant_handle,
- parent_handle = ca_detail.ca.parent.parent_handle,
- uri = ca_detail.ca_cert_uri,
- notBefore = str(cert.getNotBefore()),
- notAfter = str(cert.getNotAfter()),
- sia_uri = cert.get_sia_directory_uri(),
- aia_uri = cert.get_aia_uri(),
- asn = str(resources.asn),
- ipv4 = str(resources.v4),
- ipv6 = str(resources.v6))
- if msg_tag is not None:
- r_pdu.set("tag", msg_tag)
-
- @tornado.gen.coroutine
- def left_right_handler(self, handler):
- """
- Process one left-right message.
- """
+ bag = rpki.resource_set.resource_bag(
+ asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
+ v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
+ v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
+ valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))
- logger.debug("Entering left_right_handler()")
+ raise tornado.gen.Return(bag)
- content_type = handler.request.headers["Content-Type"]
- if content_type not in rpki.left_right.allowed_content_types:
- handler.set_status(415, "No handler for Content-Type %s" % content_type)
- handler.finish()
- return
+ @tornado.gen.coroutine
+ def irdb_query_roa_requests(self, tenant_handle):
+ """
+ Ask IRDB about self's ROA requests.
+ """
- handler.set_header("Content-Type", rpki.left_right.content_type)
+ q_msg = self._compose_left_right_query()
+ SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
- try:
- q_cms = rpki.left_right.cms_msg(DER = handler.request.body)
- q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
- r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
- type = "reply", version = rpki.left_right.version)
- self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path)
+ @tornado.gen.coroutine
+ def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles):
+ """
+ Ask IRDB about self's ghostbuster record requests.
+ """
- assert q_msg.tag.startswith(rpki.left_right.xmlns)
- assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg)
+ q_msg = self._compose_left_right_query()
+ for parent_handle in parent_handles:
+ SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests,
+ tenant_handle = tenant_handle, parent_handle = parent_handle)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
- if q_msg.get("version") != rpki.left_right.version:
- raise rpki.exceptions.BadQuery("Unrecognized protocol version")
+ @tornado.gen.coroutine
+ def irdb_query_ee_certificate_requests(self, tenant_handle):
+ """
+ Ask IRDB about self's EE certificate requests.
+ """
- if q_msg.get("type") != "query":
- raise rpki.exceptions.BadQuery("Message type is not query")
+ q_msg = self._compose_left_right_query()
+ SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
- for q_pdu in q_msg:
+ @property
+ def left_right_models(self):
+ """
+ Map element tag to rpkidb model.
+ """
try:
- action = q_pdu.get("action")
- model = self.left_right_models.get(q_pdu.tag)
-
- if q_pdu.tag in self.left_right_trivial_handlers:
- self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
-
- elif action in ("get", "list"):
- for obj in model.objects.xml_list(q_pdu):
- obj.xml_template.encode(obj, q_pdu, r_msg)
-
- elif action == "destroy":
- obj = model.objects.xml_get_for_delete(q_pdu)
- yield obj.xml_pre_delete_hook(self)
- obj.delete()
- obj.xml_template.acknowledge(obj, q_pdu, r_msg)
+ return self._left_right_models
+ except AttributeError:
+ import rpki.rpkidb.models # pylint: disable=W0621
+ self._left_right_models = {
+ rpki.left_right.tag_tenant : rpki.rpkidb.models.Tenant,
+ rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC,
+ rpki.left_right.tag_parent : rpki.rpkidb.models.Parent,
+ rpki.left_right.tag_child : rpki.rpkidb.models.Child,
+ rpki.left_right.tag_repository : rpki.rpkidb.models.Repository }
+ return self._left_right_models
+
+ @property
+ def left_right_trivial_handlers(self):
+ """
+ Map element tag to bound handler methods for trivial PDU types.
+ """
- elif action in ("create", "set"):
- obj = model.objects.xml_get_or_create(q_pdu)
- obj.xml_template.decode(obj, q_pdu)
- obj.xml_pre_save_hook(q_pdu)
- obj.save()
- yield obj.xml_post_save_hook(self, q_pdu)
- obj.xml_template.acknowledge(obj, q_pdu, r_msg)
+ try:
+ return self._left_right_trivial_handlers
+ except AttributeError:
+ self._left_right_trivial_handlers = {
+ rpki.left_right.tag_list_published_objects : self.handle_list_published_objects,
+ rpki.left_right.tag_list_received_resources : self.handle_list_received_resources }
+ return self._left_right_trivial_handlers
+
+ def handle_list_published_objects(self, q_pdu, r_msg):
+ """
+ <list_published_objects/> server.
+ """
+
+ tenant_handle = q_pdu.get("tenant_handle")
+ msg_tag = q_pdu.get("tag")
+
+ kw = dict(tenant_handle = tenant_handle)
+ if msg_tag is not None:
+ kw.update(tag = msg_tag)
+
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, state = "active"):
+ SubElement(r_msg, rpki.left_right.tag_list_published_objects,
+ uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64()
+ SubElement(r_msg, rpki.left_right.tag_list_published_objects,
+ uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64()
+ for c in ca_detail.child_certs.all():
+ SubElement(r_msg, rpki.left_right.tag_list_published_objects,
+ uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64()
+ for r in ca_detail.roas.filter(roa__isnull = False):
+ SubElement(r_msg, rpki.left_right.tag_list_published_objects,
+ uri = r.uri, **kw).text = r.roa.get_Base64()
+ for g in ca_detail.ghostbusters.all():
+ SubElement(r_msg, rpki.left_right.tag_list_published_objects,
+ uri = g.uri, **kw).text = g.ghostbuster.get_Base64()
+ for c in ca_detail.ee_certificates.all():
+ SubElement(r_msg, rpki.left_right.tag_list_published_objects,
+ uri = c.uri, **kw).text = c.cert.get_Base64()
+
+ def handle_list_received_resources(self, q_pdu, r_msg):
+ """
+ <list_received_resources/> server.
+ """
+
+ logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu))
+ tenant_handle = q_pdu.get("tenant_handle")
+ msg_tag = q_pdu.get("tag")
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle,
+ state = "active", latest_ca_cert__isnull = False):
+ cert = ca_detail.latest_ca_cert
+ resources = cert.get_3779resources()
+ r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources,
+ tenant_handle = tenant_handle,
+ parent_handle = ca_detail.ca.parent.parent_handle,
+ uri = ca_detail.ca_cert_uri,
+ notBefore = str(cert.getNotBefore()),
+ notAfter = str(cert.getNotAfter()),
+ sia_uri = cert.get_sia_directory_uri(),
+ aia_uri = cert.get_aia_uri(),
+ asn = str(resources.asn),
+ ipv4 = str(resources.v4),
+ ipv6 = str(resources.v6))
+ if msg_tag is not None:
+ r_pdu.set("tag", msg_tag)
+
+ @tornado.gen.coroutine
+ def left_right_handler(self, handler):
+ """
+ Process one left-right message.
+ """
+
+ logger.debug("Entering left_right_handler()")
+
+ content_type = handler.request.headers["Content-Type"]
+ if content_type not in rpki.left_right.allowed_content_types:
+ handler.set_status(415, "No handler for Content-Type %s" % content_type)
+ handler.finish()
+ return
+
+ handler.set_header("Content-Type", rpki.left_right.content_type)
- else:
- raise rpki.exceptions.BadQuery("Unrecognized action %r" % action)
+ try:
+ q_cms = rpki.left_right.cms_msg(DER = handler.request.body)
+ q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
+ r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
+ type = "reply", version = rpki.left_right.version)
+ self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path)
+
+ assert q_msg.tag.startswith(rpki.left_right.xmlns)
+ assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg)
+
+ if q_msg.get("version") != rpki.left_right.version:
+ raise rpki.exceptions.BadQuery("Unrecognized protocol version")
+
+ if q_msg.get("type") != "query":
+ raise rpki.exceptions.BadQuery("Message type is not query")
+
+ for q_pdu in q_msg:
+
+ try:
+ action = q_pdu.get("action")
+ model = self.left_right_models.get(q_pdu.tag)
+
+ if q_pdu.tag in self.left_right_trivial_handlers:
+ self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
+
+ elif action in ("get", "list"):
+ for obj in model.objects.xml_list(q_pdu):
+ obj.xml_template.encode(obj, q_pdu, r_msg)
+
+ elif action == "destroy":
+ obj = model.objects.xml_get_for_delete(q_pdu)
+ yield obj.xml_pre_delete_hook(self)
+ obj.delete()
+ obj.xml_template.acknowledge(obj, q_pdu, r_msg)
+
+ elif action in ("create", "set"):
+ obj = model.objects.xml_get_or_create(q_pdu)
+ obj.xml_template.decode(obj, q_pdu)
+ obj.xml_pre_save_hook(q_pdu)
+ obj.save()
+ yield obj.xml_post_save_hook(self, q_pdu)
+ obj.xml_template.acknowledge(obj, q_pdu, r_msg)
+
+ else:
+ raise rpki.exceptions.BadQuery("Unrecognized action %r" % action)
+
+ except Exception, e:
+ if not isinstance(e, rpki.exceptions.NotFound):
+ logger.exception("Unhandled exception serving left-right PDU %r", q_pdu)
+ error_tenant_handle = q_pdu.get("tenant_handle")
+ error_tag = q_pdu.get("tag")
+ r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__)
+ r_pdu.text = str(e)
+ if error_tag is not None:
+ r_pdu.set("tag", error_tag)
+ if error_tenant_handle is not None:
+ r_pdu.set("tenant_handle", error_tenant_handle)
+ break
+
+ handler.set_status(200)
+ handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ logger.debug("Normal exit from left_right_handler()")
except Exception, e:
- if not isinstance(e, rpki.exceptions.NotFound):
- logger.exception("Unhandled exception serving left-right PDU %r", q_pdu)
- error_tenant_handle = q_pdu.get("tenant_handle")
- error_tag = q_pdu.get("tag")
- r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__)
- r_pdu.text = str(e)
- if error_tag is not None:
- r_pdu.set("tag", error_tag)
- if error_tenant_handle is not None:
- r_pdu.set("tenant_handle", error_tenant_handle)
- break
-
- handler.set_status(200)
- handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
- logger.debug("Normal exit from left_right_handler()")
-
- except Exception, e:
- logger.exception("Unhandled exception serving left-right request")
- handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
- handler.finish()
-
- @tornado.gen.coroutine
- def up_down_handler(self, handler, tenant_handle, child_handle):
- """
- Process one up-down PDU.
- """
+ logger.exception("Unhandled exception serving left-right request")
+ handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+ handler.finish()
- logger.debug("Entering up_down_handler()")
+ @tornado.gen.coroutine
+ def up_down_handler(self, handler, tenant_handle, child_handle):
+ """
+ Process one up-down PDU.
+ """
- content_type = handler.request.headers["Content-Type"]
- if content_type not in rpki.up_down.allowed_content_types:
- handler.set_status(415, "No handler for Content-Type %s" % content_type)
- handler.finish()
- return
+ logger.debug("Entering up_down_handler()")
- try:
- child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
- q_der = handler.request.body
- r_der = yield child.serve_up_down(self, q_der)
- handler.set_header("Content-Type", rpki.up_down.content_type)
- handler.set_status(200)
- handler.finish(r_der)
+ content_type = handler.request.headers["Content-Type"]
+ if content_type not in rpki.up_down.allowed_content_types:
+ handler.set_status(415, "No handler for Content-Type %s" % content_type)
+ handler.finish()
+ return
- except rpki.rpkidb.models.Child.DoesNotExist:
- logger.info("Child %r of tenant %r not found", child_handle, tenant_handle)
- handler.set_status(400, "Child %r not found" % child_handle)
- handler.finish()
+ try:
+ child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
+ q_der = handler.request.body
+ r_der = yield child.serve_up_down(self, q_der)
+ handler.set_header("Content-Type", rpki.up_down.content_type)
+ handler.set_status(200)
+ handler.finish(r_der)
+
+ except rpki.rpkidb.models.Child.DoesNotExist:
+ logger.info("Child %r of tenant %r not found", child_handle, tenant_handle)
+ handler.set_status(400, "Child %r not found" % child_handle)
+ handler.finish()
- except Exception, e:
- logger.exception("Unhandled exception processing up-down request")
- handler.set_status(400, "Could not process PDU: %s" % e)
- handler.finish()
+ except Exception, e:
+ logger.exception("Unhandled exception processing up-down request")
+ handler.set_status(400, "Could not process PDU: %s" % e)
+ handler.finish()
class publication_queue(object):
- """
- Utility to simplify publication from within rpkid.
-
- General idea here is to accumulate a collection of objects to be
- published, in one or more repositories, each potentially with its
- own completion callback. Eventually we want to publish everything
- we've accumulated, at which point we need to iterate over the
- collection and do repository.call_pubd() for each repository.
- """
-
- replace = True
-
- def __init__(self, rpkid):
- self.rpkid = rpkid
- self.clear()
-
- def clear(self):
- self.repositories = {}
- self.msgs = {}
- self.handlers = {}
- if self.replace:
- self.uris = {}
-
- def queue(self, uri, repository, handler = None,
- old_obj = None, new_obj = None, old_hash = None):
-
- assert old_obj is not None or new_obj is not None or old_hash is not None
- assert old_obj is None or old_hash is None
- assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri))
- assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri))
-
- logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s",
- uri, old_obj, new_obj, old_hash)
-
- # id(repository) may need to change to repository.peer_contact_uri
- # once we convert from our custom SQL cache to Django ORM.
-
- rid = id(repository)
- if rid not in self.repositories:
- self.repositories[rid] = repository
- self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
- type = "query", version = rpki.publication.version)
-
- if self.replace and uri in self.uris:
- logger.debug("Removing publication duplicate %r", self.uris[uri])
- old_pdu = self.uris.pop(uri)
- self.msgs[rid].remove(old_pdu)
- pdu_hash = old_pdu.get("hash")
- elif old_hash is not None:
- pdu_hash = old_hash
- elif old_obj is None:
- pdu_hash = None
- else:
- pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex")
-
- if new_obj is None:
- pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash)
- else:
- pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri)
- pdu.text = new_obj.get_Base64()
- if pdu_hash is not None:
- pdu.set("hash", pdu_hash)
-
- if handler is not None:
- tag = str(id(pdu))
- self.handlers[tag] = handler
- pdu.set("tag", tag)
-
- if self.replace:
- self.uris[uri] = pdu
-
- @tornado.gen.coroutine
- def call_pubd(self):
- for rid in self.repositories:
- logger.debug("Calling pubd[%r]", self.repositories[rid])
- yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers)
- self.clear()
-
- @property
- def size(self):
- return sum(len(self.msgs[rid]) for rid in self.repositories)
-
- def empty(self):
- return not self.msgs
+ """
+ Utility to simplify publication from within rpkid.
+
+ General idea here is to accumulate a collection of objects to be
+ published, in one or more repositories, each potentially with its
+ own completion callback. Eventually we want to publish everything
+ we've accumulated, at which point we need to iterate over the
+ collection and do repository.call_pubd() for each repository.
+ """
+
+ replace = True
+
+ def __init__(self, rpkid):
+ self.rpkid = rpkid
+ self.clear()
+
+ def clear(self):
+ self.repositories = {}
+ self.msgs = {}
+ self.handlers = {}
+ if self.replace:
+ self.uris = {}
+
+ def queue(self, uri, repository, handler = None,
+ old_obj = None, new_obj = None, old_hash = None):
+
+ assert old_obj is not None or new_obj is not None or old_hash is not None
+ assert old_obj is None or old_hash is None
+ assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri))
+ assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri))
+
+ logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s",
+ uri, old_obj, new_obj, old_hash)
+
+ # id(repository) may need to change to repository.peer_contact_uri
+ # once we convert from our custom SQL cache to Django ORM.
+
+ rid = id(repository)
+ if rid not in self.repositories:
+ self.repositories[rid] = repository
+ self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
+ type = "query", version = rpki.publication.version)
+
+ if self.replace and uri in self.uris:
+ logger.debug("Removing publication duplicate %r", self.uris[uri])
+ old_pdu = self.uris.pop(uri)
+ self.msgs[rid].remove(old_pdu)
+ pdu_hash = old_pdu.get("hash")
+ elif old_hash is not None:
+ pdu_hash = old_hash
+ elif old_obj is None:
+ pdu_hash = None
+ else:
+ pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex")
+
+ if new_obj is None:
+ pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash)
+ else:
+ pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri)
+ pdu.text = new_obj.get_Base64()
+ if pdu_hash is not None:
+ pdu.set("hash", pdu_hash)
+
+ if handler is not None:
+ tag = str(id(pdu))
+ self.handlers[tag] = handler
+ pdu.set("tag", tag)
+
+ if self.replace:
+ self.uris[uri] = pdu
+
+ @tornado.gen.coroutine
+ def call_pubd(self):
+ for rid in self.repositories:
+ logger.debug("Calling pubd[%r]", self.repositories[rid])
+ yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers)
+ self.clear()
+
+ @property
+ def size(self):
+ return sum(len(self.msgs[rid]) for rid in self.repositories)
+
+ def empty(self):
+ return not self.msgs