Skip to content

Commit 11d909c

Browse files
committed
Add cache_images() to conductor
This adds the bulk of the image pre-caching logic to the conductor task manager. It takes an aggregate and list of image ids from the API service and handles the process of calling to the relevant compute nodes to initiate the image downloads, honoring the (new) config knob for overall task parallelism. Related to blueprint image-precache-support Change-Id: Id7c0ab7ae0586d49d88ff2afae149e25e59a3489
1 parent ac16511 commit 11d909c

File tree

11 files changed

+312
-4
lines changed

11 files changed

+312
-4
lines changed

nova/conductor/api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from nova import baserpc
2121
from nova.conductor import rpcapi
2222
import nova.conf
23+
from nova import image
2324

2425
CONF = nova.conf.CONF
2526

@@ -83,6 +84,7 @@ class ComputeTaskAPI(object):
8384

8485
def __init__(self):
8586
self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI()
87+
self.image_api = image.API()
8688

8789
# TODO(stephenfin): Remove the 'reservations' parameter since we don't use
8890
# reservations anymore
@@ -155,3 +157,20 @@ def rebuild_instance(self, context, instance, orig_image_ref, image_ref,
155157
preserve_ephemeral=preserve_ephemeral,
156158
host=host,
157159
request_spec=request_spec)
160+
161+
def cache_images(self, context, aggregate, image_ids):
162+
"""Request images be pre-cached on hosts within an aggregate.
163+
164+
:param context: The RequestContext
165+
:param aggregate: The objects.Aggregate representing the hosts to
166+
contact
167+
:param image_ids: A list of image ID strings to send to the hosts
168+
"""
169+
for image_id in image_ids:
170+
# Validate that we can get the image by id before we go
171+
# ask a bunch of hosts to do the same. We let this bubble
172+
# up to the API, which catches NovaException for the 4xx and
173+
# otherwise 500s if this fails in some unexpected way.
174+
self.image_api.get(context, image_id)
175+
self.conductor_compute_rpcapi.cache_images(context, aggregate,
176+
image_ids)

nova/conductor/manager.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import contextlib
1818
import copy
19+
import eventlet
1920
import functools
2021
import sys
2122

@@ -230,7 +231,7 @@ class ComputeTaskManager(base.Base):
230231
may involve coordinating activities on multiple compute nodes.
231232
"""
232233

233-
target = messaging.Target(namespace='compute_task', version='1.20')
234+
target = messaging.Target(namespace='compute_task', version='1.21')
234235

235236
def __init__(self):
236237
super(ComputeTaskManager, self).__init__()
@@ -1629,3 +1630,70 @@ def _delete_build_request(self, context, build_request, instance, cell,
16291630
pass
16301631
return False
16311632
return True
1633+
1634+
def cache_images(self, context, aggregate, image_ids):
1635+
"""Cache a set of images on the set of hosts in an aggregate.
1636+
1637+
:param context: The RequestContext
1638+
:param aggregate: The Aggregate object from the request to constrain
1639+
the host list
1640+
:param image_id: The IDs of the image to cache
1641+
"""
1642+
1643+
# TODO(danms): Fix notification sample for IMAGE_CACHE action
1644+
compute_utils.notify_about_aggregate_action(
1645+
context, aggregate,
1646+
fields.NotificationAction.IMAGE_CACHE,
1647+
fields.NotificationPhase.START)
1648+
1649+
clock = timeutils.StopWatch()
1650+
threads = CONF.image_cache.precache_concurrency
1651+
fetch_pool = eventlet.GreenPool(size=threads)
1652+
1653+
hosts_by_cell = {}
1654+
cells_by_uuid = {}
1655+
# TODO(danms): Make this a much more efficient bulk query
1656+
for hostname in aggregate.hosts:
1657+
hmap = objects.HostMapping.get_by_host(context, hostname)
1658+
cells_by_uuid.setdefault(hmap.cell_mapping.uuid, hmap.cell_mapping)
1659+
hosts_by_cell.setdefault(hmap.cell_mapping.uuid, [])
1660+
hosts_by_cell[hmap.cell_mapping.uuid].append(hostname)
1661+
1662+
LOG.info('Preparing to request pre-caching of image(s) %(image_ids)s '
1663+
'on %(hosts)i hosts across %(cells)s cells.',
1664+
{'image_ids': ','.join(image_ids),
1665+
'hosts': len(aggregate.hosts),
1666+
'cells': len(hosts_by_cell)})
1667+
clock.start()
1668+
1669+
for cell_uuid, hosts in hosts_by_cell.items():
1670+
cell = cells_by_uuid[cell_uuid]
1671+
with nova_context.target_cell(context, cell) as target_ctxt:
1672+
for host in hosts:
1673+
service = objects.Service.get_by_compute_host(target_ctxt,
1674+
host)
1675+
if not self.servicegroup_api.service_is_up(service):
1676+
LOG.info(
1677+
'Skipping image pre-cache request to compute '
1678+
'%(host)r because it is not up',
1679+
{'host': host})
1680+
continue
1681+
1682+
fetch_pool.spawn_n(self.compute_rpcapi.cache_images,
1683+
target_ctxt,
1684+
host=host,
1685+
image_ids=image_ids)
1686+
1687+
# Wait until all those things finish
1688+
fetch_pool.waitall()
1689+
1690+
clock.stop()
1691+
LOG.info('Image pre-cache operation for image(s) %(image_ids)s '
1692+
'completed in %(time).2f seconds',
1693+
{'image_ids': ','.join(image_ids),
1694+
'time': clock.elapsed()})
1695+
1696+
compute_utils.notify_about_aggregate_action(
1697+
context, aggregate,
1698+
fields.NotificationAction.IMAGE_CACHE,
1699+
fields.NotificationPhase.END)

nova/conductor/rpcapi.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from oslo_versionedobjects import base as ovo_base
2121

2222
import nova.conf
23+
from nova import exception
2324
from nova.objects import base as objects_base
2425
from nova import profiler
2526
from nova import rpc
@@ -281,6 +282,7 @@ class ComputeTaskAPI(object):
281282
instance.
282283
1.20 - migrate_server() now gets a 'host_list' parameter that represents
283284
potential alternate hosts for retries within a cell.
285+
1.21 - Added cache_images()
284286
"""
285287

286288
def __init__(self):
@@ -436,3 +438,12 @@ def rebuild_instance(self, ctxt, instance, new_pass, injected_files,
436438
del kw['request_spec']
437439
cctxt = self.client.prepare(version=version)
438440
cctxt.cast(ctxt, 'rebuild_instance', **kw)
441+
442+
def cache_images(self, ctxt, aggregate, image_ids):
443+
version = '1.21'
444+
if not self.client.can_send_version(version):
445+
raise exception.NovaException('Conductor RPC version pin does not '
446+
'allow cache_images() to be called')
447+
cctxt = self.client.prepare(version=version)
448+
cctxt.cast(ctxt, 'cache_images', aggregate=aggregate,
449+
image_ids=image_ids)

nova/conf/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from nova.conf import glance
3636
from nova.conf import guestfs
3737
from nova.conf import hyperv
38+
from nova.conf import imagecache
3839
from nova.conf import ironic
3940
from nova.conf import key_manager
4041
from nova.conf import keystone
@@ -88,6 +89,7 @@
8889
guestfs.register_opts(CONF)
8990
hyperv.register_opts(CONF)
9091
mks.register_opts(CONF)
92+
imagecache.register_opts(CONF)
9193
ironic.register_opts(CONF)
9294
key_manager.register_opts(CONF)
9395
keystone.register_opts(CONF)

nova/conf/imagecache.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
2+
# not use this file except in compliance with the License. You may obtain
3+
# a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
# License for the specific language governing permissions and limitations
11+
# under the License.
12+
13+
from oslo_config import cfg
14+
15+
imagecache_group = cfg.OptGroup(
16+
'image_cache',
17+
title='Image Cache Options',
18+
help="""
19+
A collection of options specific to image caching.
20+
""")
21+
imagecache_opts = [
22+
cfg.IntOpt('precache_concurrency',
23+
default=1,
24+
min=1,
25+
help="""
26+
Maximum number of compute hosts to trigger image precaching in parallel.
27+
28+
When an image precache request is made, compute nodes will be contacted
29+
to initiate the download. This number constrains the number of those that
30+
will happen in parallel. Higher numbers will cause more computes to work
31+
in parallel and may result in reduced time to complete the operation, but
32+
may also DDoS the image service. Lower numbers will result in more sequential
33+
operation, lower image service load, but likely longer runtime to completion.
34+
"""),
35+
]
36+
37+
38+
ALL_OPTS = (imagecache_opts,)
39+
40+
41+
def register_opts(conf):
42+
conf.register_group(imagecache_group)
43+
conf.register_opts(imagecache_opts, group=imagecache_group)
44+
45+
46+
def list_opts():
47+
return {imagecache_group: imagecache_opts}

nova/notifications/objects/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ class EventType(NotificationObject):
7171
# NotificationActionField enum
7272
# Version 1.19: SELECT_DESTINATIONS is added to the NotificationActionField
7373
# enum
74-
VERSION = '1.19'
74+
# Version 1.20: IMAGE_CACHE is added to the NotificationActionField enum
75+
VERSION = '1.20'
7576

7677
fields = {
7778
'object': fields.StringField(nullable=False),

nova/objects/fields.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,7 @@ class NotificationAction(BaseNovaEnum):
864864
BUILD_INSTANCES = 'build_instances'
865865
MIGRATE_SERVER = 'migrate_server'
866866
REBUILD_SERVER = 'rebuild_server'
867+
IMAGE_CACHE = 'cache_images'
867868

868869
ALL = (UPDATE, EXCEPTION, DELETE, PAUSE, UNPAUSE, RESIZE, VOLUME_SWAP,
869870
SUSPEND, POWER_ON, REBOOT, SHUTDOWN, SNAPSHOT, INTERFACE_ATTACH,
@@ -877,7 +878,7 @@ class NotificationAction(BaseNovaEnum):
877878
REMOVE_HOST, ADD_MEMBER, UPDATE_METADATA, LOCK, UNLOCK,
878879
REBUILD_SCHEDULED, UPDATE_PROP, LIVE_MIGRATION_FORCE_COMPLETE,
879880
CONNECT, USAGE, BUILD_INSTANCES, MIGRATE_SERVER, REBUILD_SERVER,
880-
SELECT_DESTINATIONS)
881+
SELECT_DESTINATIONS, IMAGE_CACHE)
881882

882883

883884
# TODO(rlrossit): These should be changed over to be a StateMachine enum from
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
2+
# not use this file except in compliance with the License. You may obtain
3+
# a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
# License for the specific language governing permissions and limitations
11+
# under the License.
12+
13+
from oslo_utils.fixture import uuidsentinel as uuids
14+
15+
from nova import context
16+
from nova import objects
17+
from nova import test
18+
from nova.tests.unit import fake_notifier
19+
20+
21+
class ImageCacheTest(test.TestCase):
22+
NUMBER_OF_CELLS = 2
23+
24+
def setUp(self):
25+
super(ImageCacheTest, self).setUp()
26+
27+
self.flags(compute_driver='fake.FakeDriverWithCaching')
28+
29+
fake_notifier.stub_notifier(self)
30+
self.addCleanup(fake_notifier.reset)
31+
self.context = context.get_admin_context()
32+
33+
self.conductor = self.start_service('conductor')
34+
self.compute1 = self.start_service('compute', host='compute1')
35+
self.compute2 = self.start_service('compute', host='compute2')
36+
self.compute3 = self.start_service('compute', host='compute3',
37+
cell='cell2')
38+
self.compute4 = self.start_service('compute', host='compute4',
39+
cell='cell2')
40+
self.compute5 = self.start_service('compute', host='compute5',
41+
cell='cell2')
42+
43+
cell2 = self.cell_mappings['cell2']
44+
with context.target_cell(self.context, cell2) as cctxt:
45+
srv = objects.Service.get_by_compute_host(cctxt, 'compute5')
46+
srv.forced_down = True
47+
srv.save()
48+
49+
def test_cache_image(self):
50+
"""Test caching images by injecting the request directly to
51+
the conductor service and making sure it fans out and calls
52+
the expected nodes.
53+
"""
54+
55+
aggregate = objects.Aggregate(name='test',
56+
uuid=uuids.aggregate,
57+
id=1,
58+
hosts=['compute1', 'compute3',
59+
'compute4', 'compute5'])
60+
self.conductor.compute_task_mgr.cache_images(
61+
self.context, aggregate, ['an-image'])
62+
63+
# NOTE(danms): We expect only three image cache attempts because
64+
# compute5 is marked as forced-down and compute2 is not in the
65+
# requested aggregate.
66+
for host in ['compute1', 'compute3', 'compute4']:
67+
mgr = getattr(self, host)
68+
self.assertEqual(set(['an-image']), mgr.driver.cached_images)
69+
for host in ['compute2', 'compute5']:
70+
mgr = getattr(self, host)
71+
self.assertEqual(set(), mgr.driver.cached_images)
72+
73+
fake_notifier.wait_for_versioned_notifications(
74+
'aggregate.cache_images.start')
75+
fake_notifier.wait_for_versioned_notifications(
76+
'aggregate.cache_images.end')

0 commit comments

Comments
 (0)