From 11d909c2cbf80ce899b7a68ce72897dfb2945fed Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Thu, 3 Oct 2019 11:55:00 -0700 Subject: [PATCH] Add cache_images() to conductor This adds the bulk of the image pre-caching logic to the conductor task manager. It takes an aggregate and list of image ids from the API service and handles the process of calling to the relevant compute nodes to initiate the image downloads, honoring the (new) config knob for overall task parallelism. Related to blueprint image-precache-support Change-Id: Id7c0ab7ae0586d49d88ff2afae149e25e59a3489 --- nova/conductor/api.py | 19 +++++ nova/conductor/manager.py | 70 ++++++++++++++++- nova/conductor/rpcapi.py | 11 +++ nova/conf/__init__.py | 2 + nova/conf/imagecache.py | 47 ++++++++++++ nova/notifications/objects/base.py | 3 +- nova/objects/fields.py | 3 +- .../functional/compute/test_cache_image.py | 76 +++++++++++++++++++ nova/tests/unit/conductor/test_conductor.py | 70 +++++++++++++++++ .../objects/test_notification.py | 2 +- nova/virt/fake.py | 13 ++++ 11 files changed, 312 insertions(+), 4 deletions(-) create mode 100644 nova/conf/imagecache.py create mode 100644 nova/tests/functional/compute/test_cache_image.py diff --git a/nova/conductor/api.py b/nova/conductor/api.py index a7c49d7b9d0..3ad5a4b168b 100644 --- a/nova/conductor/api.py +++ b/nova/conductor/api.py @@ -20,6 +20,7 @@ from nova import baserpc from nova.conductor import rpcapi import nova.conf +from nova import image CONF = nova.conf.CONF @@ -83,6 +84,7 @@ class ComputeTaskAPI(object): def __init__(self): self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI() + self.image_api = image.API() # TODO(stephenfin): Remove the 'reservations' parameter since we don't use # reservations anymore @@ -155,3 +157,20 @@ def rebuild_instance(self, context, instance, orig_image_ref, image_ref, preserve_ephemeral=preserve_ephemeral, host=host, request_spec=request_spec) + + def cache_images(self, context, aggregate, image_ids): + """Request images be pre-cached on hosts within an aggregate. + + :param context: The RequestContext + :param aggregate: The objects.Aggregate representing the hosts to + contact + :param image_ids: A list of image ID strings to send to the hosts + """ + for image_id in image_ids: + # Validate that we can get the image by id before we go + # ask a bunch of hosts to do the same. We let this bubble + # up to the API, which catches NovaException for the 4xx and + # otherwise 500s if this fails in some unexpected way. + self.image_api.get(context, image_id) + self.conductor_compute_rpcapi.cache_images(context, aggregate, + image_ids) diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index e052f3798af..ce29fbb64e0 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -16,6 +16,7 @@ import contextlib import copy +import eventlet import functools import sys @@ -230,7 +231,7 @@ class ComputeTaskManager(base.Base): may involve coordinating activities on multiple compute nodes. """ - target = messaging.Target(namespace='compute_task', version='1.20') + target = messaging.Target(namespace='compute_task', version='1.21') def __init__(self): super(ComputeTaskManager, self).__init__() @@ -1629,3 +1630,70 @@ def _delete_build_request(self, context, build_request, instance, cell, pass return False return True + + def cache_images(self, context, aggregate, image_ids): + """Cache a set of images on the set of hosts in an aggregate. + + :param context: The RequestContext + :param aggregate: The Aggregate object from the request to constrain + the host list + :param image_id: The IDs of the image to cache + """ + + # TODO(danms): Fix notification sample for IMAGE_CACHE action + compute_utils.notify_about_aggregate_action( + context, aggregate, + fields.NotificationAction.IMAGE_CACHE, + fields.NotificationPhase.START) + + clock = timeutils.StopWatch() + threads = CONF.image_cache.precache_concurrency + fetch_pool = eventlet.GreenPool(size=threads) + + hosts_by_cell = {} + cells_by_uuid = {} + # TODO(danms): Make this a much more efficient bulk query + for hostname in aggregate.hosts: + hmap = objects.HostMapping.get_by_host(context, hostname) + cells_by_uuid.setdefault(hmap.cell_mapping.uuid, hmap.cell_mapping) + hosts_by_cell.setdefault(hmap.cell_mapping.uuid, []) + hosts_by_cell[hmap.cell_mapping.uuid].append(hostname) + + LOG.info('Preparing to request pre-caching of image(s) %(image_ids)s ' + 'on %(hosts)i hosts across %(cells)s cells.', + {'image_ids': ','.join(image_ids), + 'hosts': len(aggregate.hosts), + 'cells': len(hosts_by_cell)}) + clock.start() + + for cell_uuid, hosts in hosts_by_cell.items(): + cell = cells_by_uuid[cell_uuid] + with nova_context.target_cell(context, cell) as target_ctxt: + for host in hosts: + service = objects.Service.get_by_compute_host(target_ctxt, + host) + if not self.servicegroup_api.service_is_up(service): + LOG.info( + 'Skipping image pre-cache request to compute ' + '%(host)r because it is not up', + {'host': host}) + continue + + fetch_pool.spawn_n(self.compute_rpcapi.cache_images, + target_ctxt, + host=host, + image_ids=image_ids) + + # Wait until all those things finish + fetch_pool.waitall() + + clock.stop() + LOG.info('Image pre-cache operation for image(s) %(image_ids)s ' + 'completed in %(time).2f seconds', + {'image_ids': ','.join(image_ids), + 'time': clock.elapsed()}) + + compute_utils.notify_about_aggregate_action( + context, aggregate, + fields.NotificationAction.IMAGE_CACHE, + fields.NotificationPhase.END) diff --git a/nova/conductor/rpcapi.py b/nova/conductor/rpcapi.py index 6a93d92a0e3..bc17f6faded 100644 --- a/nova/conductor/rpcapi.py +++ b/nova/conductor/rpcapi.py @@ -20,6 +20,7 @@ from oslo_versionedobjects import base as ovo_base import nova.conf +from nova import exception from nova.objects import base as objects_base from nova import profiler from nova import rpc @@ -281,6 +282,7 @@ class ComputeTaskAPI(object): instance. 1.20 - migrate_server() now gets a 'host_list' parameter that represents potential alternate hosts for retries within a cell. + 1.21 - Added cache_images() """ def __init__(self): @@ -436,3 +438,12 @@ def rebuild_instance(self, ctxt, instance, new_pass, injected_files, del kw['request_spec'] cctxt = self.client.prepare(version=version) cctxt.cast(ctxt, 'rebuild_instance', **kw) + + def cache_images(self, ctxt, aggregate, image_ids): + version = '1.21' + if not self.client.can_send_version(version): + raise exception.NovaException('Conductor RPC version pin does not ' + 'allow cache_images() to be called') + cctxt = self.client.prepare(version=version) + cctxt.cast(ctxt, 'cache_images', aggregate=aggregate, + image_ids=image_ids) diff --git a/nova/conf/__init__.py b/nova/conf/__init__.py index 36c3583e921..7cf5b197067 100644 --- a/nova/conf/__init__.py +++ b/nova/conf/__init__.py @@ -35,6 +35,7 @@ from nova.conf import glance from nova.conf import guestfs from nova.conf import hyperv +from nova.conf import imagecache from nova.conf import ironic from nova.conf import key_manager from nova.conf import keystone @@ -88,6 +89,7 @@ guestfs.register_opts(CONF) hyperv.register_opts(CONF) mks.register_opts(CONF) +imagecache.register_opts(CONF) ironic.register_opts(CONF) key_manager.register_opts(CONF) keystone.register_opts(CONF) diff --git a/nova/conf/imagecache.py b/nova/conf/imagecache.py new file mode 100644 index 00000000000..932c9ceee1a --- /dev/null +++ b/nova/conf/imagecache.py @@ -0,0 +1,47 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +imagecache_group = cfg.OptGroup( + 'image_cache', + title='Image Cache Options', + help=""" +A collection of options specific to image caching. +""") +imagecache_opts = [ + cfg.IntOpt('precache_concurrency', + default=1, + min=1, + help=""" +Maximum number of compute hosts to trigger image precaching in parallel. + +When an image precache request is made, compute nodes will be contacted +to initiate the download. This number constrains the number of those that +will happen in parallel. Higher numbers will cause more computes to work +in parallel and may result in reduced time to complete the operation, but +may also DDoS the image service. Lower numbers will result in more sequential +operation, lower image service load, but likely longer runtime to completion. +"""), +] + + +ALL_OPTS = (imagecache_opts,) + + +def register_opts(conf): + conf.register_group(imagecache_group) + conf.register_opts(imagecache_opts, group=imagecache_group) + + +def list_opts(): + return {imagecache_group: imagecache_opts} diff --git a/nova/notifications/objects/base.py b/nova/notifications/objects/base.py index 18485d583f3..3e223776fec 100644 --- a/nova/notifications/objects/base.py +++ b/nova/notifications/objects/base.py @@ -71,7 +71,8 @@ class EventType(NotificationObject): # NotificationActionField enum # Version 1.19: SELECT_DESTINATIONS is added to the NotificationActionField # enum - VERSION = '1.19' + # Version 1.20: IMAGE_CACHE is added to the NotificationActionField enum + VERSION = '1.20' fields = { 'object': fields.StringField(nullable=False), diff --git a/nova/objects/fields.py b/nova/objects/fields.py index a768d0236af..fbd2854389a 100644 --- a/nova/objects/fields.py +++ b/nova/objects/fields.py @@ -864,6 +864,7 @@ class NotificationAction(BaseNovaEnum): BUILD_INSTANCES = 'build_instances' MIGRATE_SERVER = 'migrate_server' REBUILD_SERVER = 'rebuild_server' + IMAGE_CACHE = 'cache_images' ALL = (UPDATE, EXCEPTION, DELETE, PAUSE, UNPAUSE, RESIZE, VOLUME_SWAP, SUSPEND, POWER_ON, REBOOT, SHUTDOWN, SNAPSHOT, INTERFACE_ATTACH, @@ -877,7 +878,7 @@ class NotificationAction(BaseNovaEnum): REMOVE_HOST, ADD_MEMBER, UPDATE_METADATA, LOCK, UNLOCK, REBUILD_SCHEDULED, UPDATE_PROP, LIVE_MIGRATION_FORCE_COMPLETE, CONNECT, USAGE, BUILD_INSTANCES, MIGRATE_SERVER, REBUILD_SERVER, - SELECT_DESTINATIONS) + SELECT_DESTINATIONS, IMAGE_CACHE) # TODO(rlrossit): These should be changed over to be a StateMachine enum from diff --git a/nova/tests/functional/compute/test_cache_image.py b/nova/tests/functional/compute/test_cache_image.py new file mode 100644 index 00000000000..844ecae0bd1 --- /dev/null +++ b/nova/tests/functional/compute/test_cache_image.py @@ -0,0 +1,76 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils.fixture import uuidsentinel as uuids + +from nova import context +from nova import objects +from nova import test +from nova.tests.unit import fake_notifier + + +class ImageCacheTest(test.TestCase): + NUMBER_OF_CELLS = 2 + + def setUp(self): + super(ImageCacheTest, self).setUp() + + self.flags(compute_driver='fake.FakeDriverWithCaching') + + fake_notifier.stub_notifier(self) + self.addCleanup(fake_notifier.reset) + self.context = context.get_admin_context() + + self.conductor = self.start_service('conductor') + self.compute1 = self.start_service('compute', host='compute1') + self.compute2 = self.start_service('compute', host='compute2') + self.compute3 = self.start_service('compute', host='compute3', + cell='cell2') + self.compute4 = self.start_service('compute', host='compute4', + cell='cell2') + self.compute5 = self.start_service('compute', host='compute5', + cell='cell2') + + cell2 = self.cell_mappings['cell2'] + with context.target_cell(self.context, cell2) as cctxt: + srv = objects.Service.get_by_compute_host(cctxt, 'compute5') + srv.forced_down = True + srv.save() + + def test_cache_image(self): + """Test caching images by injecting the request directly to + the conductor service and making sure it fans out and calls + the expected nodes. + """ + + aggregate = objects.Aggregate(name='test', + uuid=uuids.aggregate, + id=1, + hosts=['compute1', 'compute3', + 'compute4', 'compute5']) + self.conductor.compute_task_mgr.cache_images( + self.context, aggregate, ['an-image']) + + # NOTE(danms): We expect only three image cache attempts because + # compute5 is marked as forced-down and compute2 is not in the + # requested aggregate. + for host in ['compute1', 'compute3', 'compute4']: + mgr = getattr(self, host) + self.assertEqual(set(['an-image']), mgr.driver.cached_images) + for host in ['compute2', 'compute5']: + mgr = getattr(self, host) + self.assertEqual(set(), mgr.driver.cached_images) + + fake_notifier.wait_for_versioned_notifications( + 'aggregate.cache_images.start') + fake_notifier.wait_for_versioned_notifications( + 'aggregate.cache_images.end') diff --git a/nova/tests/unit/conductor/test_conductor.py b/nova/tests/unit/conductor/test_conductor.py index 0f6689eba24..6ab271de039 100644 --- a/nova/tests/unit/conductor/test_conductor.py +++ b/nova/tests/unit/conductor/test_conductor.py @@ -3567,6 +3567,23 @@ def _test(prepare_mock, can_send_mock): self.context, 'build_instances', **kw) _test() + def test_cache_images(self): + with mock.patch.object(self.conductor, 'client') as client: + self.conductor.cache_images(self.context, mock.sentinel.aggregate, + [mock.sentinel.image]) + client.prepare.return_value.cast.assert_called_once_with( + self.context, 'cache_images', + aggregate=mock.sentinel.aggregate, + image_ids=[mock.sentinel.image]) + client.prepare.assert_called_once_with(version='1.21') + + with mock.patch.object(self.conductor.client, 'can_send_version') as v: + v.return_value = False + self.assertRaises(exc.NovaException, + self.conductor.cache_images, + self.context, mock.sentinel.aggregate, + [mock.sentinel.image]) + class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase): """Compute task API Tests.""" @@ -3591,3 +3608,56 @@ def test_live_migrate(self): self.context, inst_obj, {'host': 'destination'}, True, False, None, 'block_migration', 'disk_over_commit', None, request_spec=None) + + def test_cache_images(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + self.conductor.cache_images(self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_image.assert_has_calls([mock.call(self.context, + mock.sentinel.image1), + mock.call(self.context, + mock.sentinel.image2)]) + mock_cache.assert_called_once_with( + self.context, mock.sentinel.aggregate, + [mock.sentinel.image1, mock.sentinel.image2]) + + _test() + + def test_cache_images_fail(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + mock_image.side_effect = test.TestingException() + # We should expect to see non-NovaException errors + # raised directly so the API can 500 for them. + self.assertRaises(test.TestingException, + self.conductor.cache_images, + self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_cache.assert_not_called() + + _test() + + def test_cache_images_missing(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + mock_image.side_effect = exc.ImageNotFound('foo') + self.assertRaises(exc.ImageNotFound, + self.conductor.cache_images, + self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_cache.assert_not_called() + + _test() diff --git a/nova/tests/unit/notifications/objects/test_notification.py b/nova/tests/unit/notifications/objects/test_notification.py index c05287bfa19..47cc29f8ba0 100644 --- a/nova/tests/unit/notifications/objects/test_notification.py +++ b/nova/tests/unit/notifications/objects/test_notification.py @@ -375,7 +375,7 @@ def test_payload_is_not_generated_if_notification_format_is_unversioned( 'ComputeTaskNotification': '1.0-a73147b93b520ff0061865849d3dfa56', 'ComputeTaskPayload': '1.0-e3d34762c14d131c98337b72e8c600e1', 'DestinationPayload': '1.0-4ccf26318dd18c4377dada2b1e74ec2e', - 'EventType': '1.19-000a76e83b06a9de11d365465a755a5e', + 'EventType': '1.20-4e02a676d3a18cab99579cacd1c91453', 'ExceptionNotification': '1.0-a73147b93b520ff0061865849d3dfa56', 'ExceptionPayload': '1.1-6c43008bd81885a63bc7f7c629f0793b', 'FlavorNotification': '1.0-a73147b93b520ff0061865849d3dfa56', diff --git a/nova/virt/fake.py b/nova/virt/fake.py index e185ad14b0f..2bbae8bd1b8 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -1013,3 +1013,16 @@ def get_available_resource(self, nodename): }, ]) return host_status + + +class FakeDriverWithCaching(FakeDriver): + def __init__(self, *a, **k): + super(FakeDriverWithCaching, self).__init__(*a, **k) + self.cached_images = set() + + def cache_image(self, context, image_id): + if image_id in self.cached_images: + return False + else: + self.cached_images.add(image_id) + return True