diff --git a/nova/conductor/api.py b/nova/conductor/api.py index a7c49d7b9d0..3ad5a4b168b 100644 --- a/nova/conductor/api.py +++ b/nova/conductor/api.py @@ -20,6 +20,7 @@ from nova import baserpc from nova.conductor import rpcapi import nova.conf +from nova import image CONF = nova.conf.CONF @@ -83,6 +84,7 @@ class ComputeTaskAPI(object): def __init__(self): self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI() + self.image_api = image.API() # TODO(stephenfin): Remove the 'reservations' parameter since we don't use # reservations anymore @@ -155,3 +157,20 @@ def rebuild_instance(self, context, instance, orig_image_ref, image_ref, preserve_ephemeral=preserve_ephemeral, host=host, request_spec=request_spec) + + def cache_images(self, context, aggregate, image_ids): + """Request images be pre-cached on hosts within an aggregate. + + :param context: The RequestContext + :param aggregate: The objects.Aggregate representing the hosts to + contact + :param image_ids: A list of image ID strings to send to the hosts + """ + for image_id in image_ids: + # Validate that we can get the image by id before we go + # ask a bunch of hosts to do the same. We let this bubble + # up to the API, which catches NovaException for the 4xx and + # otherwise 500s if this fails in some unexpected way. + self.image_api.get(context, image_id) + self.conductor_compute_rpcapi.cache_images(context, aggregate, + image_ids) diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index e052f3798af..ce29fbb64e0 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -16,6 +16,7 @@ import contextlib import copy +import eventlet import functools import sys @@ -230,7 +231,7 @@ class ComputeTaskManager(base.Base): may involve coordinating activities on multiple compute nodes. """ - target = messaging.Target(namespace='compute_task', version='1.20') + target = messaging.Target(namespace='compute_task', version='1.21') def __init__(self): super(ComputeTaskManager, self).__init__() @@ -1629,3 +1630,70 @@ def _delete_build_request(self, context, build_request, instance, cell, pass return False return True + + def cache_images(self, context, aggregate, image_ids): + """Cache a set of images on the set of hosts in an aggregate. + + :param context: The RequestContext + :param aggregate: The Aggregate object from the request to constrain + the host list + :param image_id: The IDs of the image to cache + """ + + # TODO(danms): Fix notification sample for IMAGE_CACHE action + compute_utils.notify_about_aggregate_action( + context, aggregate, + fields.NotificationAction.IMAGE_CACHE, + fields.NotificationPhase.START) + + clock = timeutils.StopWatch() + threads = CONF.image_cache.precache_concurrency + fetch_pool = eventlet.GreenPool(size=threads) + + hosts_by_cell = {} + cells_by_uuid = {} + # TODO(danms): Make this a much more efficient bulk query + for hostname in aggregate.hosts: + hmap = objects.HostMapping.get_by_host(context, hostname) + cells_by_uuid.setdefault(hmap.cell_mapping.uuid, hmap.cell_mapping) + hosts_by_cell.setdefault(hmap.cell_mapping.uuid, []) + hosts_by_cell[hmap.cell_mapping.uuid].append(hostname) + + LOG.info('Preparing to request pre-caching of image(s) %(image_ids)s ' + 'on %(hosts)i hosts across %(cells)s cells.', + {'image_ids': ','.join(image_ids), + 'hosts': len(aggregate.hosts), + 'cells': len(hosts_by_cell)}) + clock.start() + + for cell_uuid, hosts in hosts_by_cell.items(): + cell = cells_by_uuid[cell_uuid] + with nova_context.target_cell(context, cell) as target_ctxt: + for host in hosts: + service = objects.Service.get_by_compute_host(target_ctxt, + host) + if not self.servicegroup_api.service_is_up(service): + LOG.info( + 'Skipping image pre-cache request to compute ' + '%(host)r because it is not up', + {'host': host}) + continue + + fetch_pool.spawn_n(self.compute_rpcapi.cache_images, + target_ctxt, + host=host, + image_ids=image_ids) + + # Wait until all those things finish + fetch_pool.waitall() + + clock.stop() + LOG.info('Image pre-cache operation for image(s) %(image_ids)s ' + 'completed in %(time).2f seconds', + {'image_ids': ','.join(image_ids), + 'time': clock.elapsed()}) + + compute_utils.notify_about_aggregate_action( + context, aggregate, + fields.NotificationAction.IMAGE_CACHE, + fields.NotificationPhase.END) diff --git a/nova/conductor/rpcapi.py b/nova/conductor/rpcapi.py index 6a93d92a0e3..bc17f6faded 100644 --- a/nova/conductor/rpcapi.py +++ b/nova/conductor/rpcapi.py @@ -20,6 +20,7 @@ from oslo_versionedobjects import base as ovo_base import nova.conf +from nova import exception from nova.objects import base as objects_base from nova import profiler from nova import rpc @@ -281,6 +282,7 @@ class ComputeTaskAPI(object): instance. 1.20 - migrate_server() now gets a 'host_list' parameter that represents potential alternate hosts for retries within a cell. + 1.21 - Added cache_images() """ def __init__(self): @@ -436,3 +438,12 @@ def rebuild_instance(self, ctxt, instance, new_pass, injected_files, del kw['request_spec'] cctxt = self.client.prepare(version=version) cctxt.cast(ctxt, 'rebuild_instance', **kw) + + def cache_images(self, ctxt, aggregate, image_ids): + version = '1.21' + if not self.client.can_send_version(version): + raise exception.NovaException('Conductor RPC version pin does not ' + 'allow cache_images() to be called') + cctxt = self.client.prepare(version=version) + cctxt.cast(ctxt, 'cache_images', aggregate=aggregate, + image_ids=image_ids) diff --git a/nova/conf/__init__.py b/nova/conf/__init__.py index 36c3583e921..7cf5b197067 100644 --- a/nova/conf/__init__.py +++ b/nova/conf/__init__.py @@ -35,6 +35,7 @@ from nova.conf import glance from nova.conf import guestfs from nova.conf import hyperv +from nova.conf import imagecache from nova.conf import ironic from nova.conf import key_manager from nova.conf import keystone @@ -88,6 +89,7 @@ guestfs.register_opts(CONF) hyperv.register_opts(CONF) mks.register_opts(CONF) +imagecache.register_opts(CONF) ironic.register_opts(CONF) key_manager.register_opts(CONF) keystone.register_opts(CONF) diff --git a/nova/conf/imagecache.py b/nova/conf/imagecache.py new file mode 100644 index 00000000000..932c9ceee1a --- /dev/null +++ b/nova/conf/imagecache.py @@ -0,0 +1,47 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +imagecache_group = cfg.OptGroup( + 'image_cache', + title='Image Cache Options', + help=""" +A collection of options specific to image caching. +""") +imagecache_opts = [ + cfg.IntOpt('precache_concurrency', + default=1, + min=1, + help=""" +Maximum number of compute hosts to trigger image precaching in parallel. + +When an image precache request is made, compute nodes will be contacted +to initiate the download. This number constrains the number of those that +will happen in parallel. Higher numbers will cause more computes to work +in parallel and may result in reduced time to complete the operation, but +may also DDoS the image service. Lower numbers will result in more sequential +operation, lower image service load, but likely longer runtime to completion. +"""), +] + + +ALL_OPTS = (imagecache_opts,) + + +def register_opts(conf): + conf.register_group(imagecache_group) + conf.register_opts(imagecache_opts, group=imagecache_group) + + +def list_opts(): + return {imagecache_group: imagecache_opts} diff --git a/nova/notifications/objects/base.py b/nova/notifications/objects/base.py index 18485d583f3..3e223776fec 100644 --- a/nova/notifications/objects/base.py +++ b/nova/notifications/objects/base.py @@ -71,7 +71,8 @@ class EventType(NotificationObject): # NotificationActionField enum # Version 1.19: SELECT_DESTINATIONS is added to the NotificationActionField # enum - VERSION = '1.19' + # Version 1.20: IMAGE_CACHE is added to the NotificationActionField enum + VERSION = '1.20' fields = { 'object': fields.StringField(nullable=False), diff --git a/nova/objects/fields.py b/nova/objects/fields.py index a768d0236af..fbd2854389a 100644 --- a/nova/objects/fields.py +++ b/nova/objects/fields.py @@ -864,6 +864,7 @@ class NotificationAction(BaseNovaEnum): BUILD_INSTANCES = 'build_instances' MIGRATE_SERVER = 'migrate_server' REBUILD_SERVER = 'rebuild_server' + IMAGE_CACHE = 'cache_images' ALL = (UPDATE, EXCEPTION, DELETE, PAUSE, UNPAUSE, RESIZE, VOLUME_SWAP, SUSPEND, POWER_ON, REBOOT, SHUTDOWN, SNAPSHOT, INTERFACE_ATTACH, @@ -877,7 +878,7 @@ class NotificationAction(BaseNovaEnum): REMOVE_HOST, ADD_MEMBER, UPDATE_METADATA, LOCK, UNLOCK, REBUILD_SCHEDULED, UPDATE_PROP, LIVE_MIGRATION_FORCE_COMPLETE, CONNECT, USAGE, BUILD_INSTANCES, MIGRATE_SERVER, REBUILD_SERVER, - SELECT_DESTINATIONS) + SELECT_DESTINATIONS, IMAGE_CACHE) # TODO(rlrossit): These should be changed over to be a StateMachine enum from diff --git a/nova/tests/functional/compute/test_cache_image.py b/nova/tests/functional/compute/test_cache_image.py new file mode 100644 index 00000000000..844ecae0bd1 --- /dev/null +++ b/nova/tests/functional/compute/test_cache_image.py @@ -0,0 +1,76 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils.fixture import uuidsentinel as uuids + +from nova import context +from nova import objects +from nova import test +from nova.tests.unit import fake_notifier + + +class ImageCacheTest(test.TestCase): + NUMBER_OF_CELLS = 2 + + def setUp(self): + super(ImageCacheTest, self).setUp() + + self.flags(compute_driver='fake.FakeDriverWithCaching') + + fake_notifier.stub_notifier(self) + self.addCleanup(fake_notifier.reset) + self.context = context.get_admin_context() + + self.conductor = self.start_service('conductor') + self.compute1 = self.start_service('compute', host='compute1') + self.compute2 = self.start_service('compute', host='compute2') + self.compute3 = self.start_service('compute', host='compute3', + cell='cell2') + self.compute4 = self.start_service('compute', host='compute4', + cell='cell2') + self.compute5 = self.start_service('compute', host='compute5', + cell='cell2') + + cell2 = self.cell_mappings['cell2'] + with context.target_cell(self.context, cell2) as cctxt: + srv = objects.Service.get_by_compute_host(cctxt, 'compute5') + srv.forced_down = True + srv.save() + + def test_cache_image(self): + """Test caching images by injecting the request directly to + the conductor service and making sure it fans out and calls + the expected nodes. + """ + + aggregate = objects.Aggregate(name='test', + uuid=uuids.aggregate, + id=1, + hosts=['compute1', 'compute3', + 'compute4', 'compute5']) + self.conductor.compute_task_mgr.cache_images( + self.context, aggregate, ['an-image']) + + # NOTE(danms): We expect only three image cache attempts because + # compute5 is marked as forced-down and compute2 is not in the + # requested aggregate. + for host in ['compute1', 'compute3', 'compute4']: + mgr = getattr(self, host) + self.assertEqual(set(['an-image']), mgr.driver.cached_images) + for host in ['compute2', 'compute5']: + mgr = getattr(self, host) + self.assertEqual(set(), mgr.driver.cached_images) + + fake_notifier.wait_for_versioned_notifications( + 'aggregate.cache_images.start') + fake_notifier.wait_for_versioned_notifications( + 'aggregate.cache_images.end') diff --git a/nova/tests/unit/conductor/test_conductor.py b/nova/tests/unit/conductor/test_conductor.py index 0f6689eba24..6ab271de039 100644 --- a/nova/tests/unit/conductor/test_conductor.py +++ b/nova/tests/unit/conductor/test_conductor.py @@ -3567,6 +3567,23 @@ def _test(prepare_mock, can_send_mock): self.context, 'build_instances', **kw) _test() + def test_cache_images(self): + with mock.patch.object(self.conductor, 'client') as client: + self.conductor.cache_images(self.context, mock.sentinel.aggregate, + [mock.sentinel.image]) + client.prepare.return_value.cast.assert_called_once_with( + self.context, 'cache_images', + aggregate=mock.sentinel.aggregate, + image_ids=[mock.sentinel.image]) + client.prepare.assert_called_once_with(version='1.21') + + with mock.patch.object(self.conductor.client, 'can_send_version') as v: + v.return_value = False + self.assertRaises(exc.NovaException, + self.conductor.cache_images, + self.context, mock.sentinel.aggregate, + [mock.sentinel.image]) + class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase): """Compute task API Tests.""" @@ -3591,3 +3608,56 @@ def test_live_migrate(self): self.context, inst_obj, {'host': 'destination'}, True, False, None, 'block_migration', 'disk_over_commit', None, request_spec=None) + + def test_cache_images(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + self.conductor.cache_images(self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_image.assert_has_calls([mock.call(self.context, + mock.sentinel.image1), + mock.call(self.context, + mock.sentinel.image2)]) + mock_cache.assert_called_once_with( + self.context, mock.sentinel.aggregate, + [mock.sentinel.image1, mock.sentinel.image2]) + + _test() + + def test_cache_images_fail(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + mock_image.side_effect = test.TestingException() + # We should expect to see non-NovaException errors + # raised directly so the API can 500 for them. + self.assertRaises(test.TestingException, + self.conductor.cache_images, + self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_cache.assert_not_called() + + _test() + + def test_cache_images_missing(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + mock_image.side_effect = exc.ImageNotFound('foo') + self.assertRaises(exc.ImageNotFound, + self.conductor.cache_images, + self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_cache.assert_not_called() + + _test() diff --git a/nova/tests/unit/notifications/objects/test_notification.py b/nova/tests/unit/notifications/objects/test_notification.py index c05287bfa19..47cc29f8ba0 100644 --- a/nova/tests/unit/notifications/objects/test_notification.py +++ b/nova/tests/unit/notifications/objects/test_notification.py @@ -375,7 +375,7 @@ def test_payload_is_not_generated_if_notification_format_is_unversioned( 'ComputeTaskNotification': '1.0-a73147b93b520ff0061865849d3dfa56', 'ComputeTaskPayload': '1.0-e3d34762c14d131c98337b72e8c600e1', 'DestinationPayload': '1.0-4ccf26318dd18c4377dada2b1e74ec2e', - 'EventType': '1.19-000a76e83b06a9de11d365465a755a5e', + 'EventType': '1.20-4e02a676d3a18cab99579cacd1c91453', 'ExceptionNotification': '1.0-a73147b93b520ff0061865849d3dfa56', 'ExceptionPayload': '1.1-6c43008bd81885a63bc7f7c629f0793b', 'FlavorNotification': '1.0-a73147b93b520ff0061865849d3dfa56', diff --git a/nova/virt/fake.py b/nova/virt/fake.py index e185ad14b0f..2bbae8bd1b8 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -1013,3 +1013,16 @@ def get_available_resource(self, nodename): }, ]) return host_status + + +class FakeDriverWithCaching(FakeDriver): + def __init__(self, *a, **k): + super(FakeDriverWithCaching, self).__init__(*a, **k) + self.cached_images = set() + + def cache_image(self, context, image_id): + if image_id in self.cached_images: + return False + else: + self.cached_images.add(image_id) + return True