2727import java .util .Iterator ;
2828import java .util .List ;
2929import java .util .Map ;
30+ import java .util .Optional ;
3031import java .util .Set ;
32+ import java .util .stream .Collectors ;
3133
3234import org .apache .helix .HelixDefinedState ;
3335import org .apache .helix .PropertyKey ;
3436import org .apache .helix .controller .common .PartitionStateMap ;
3537import org .apache .helix .controller .dataproviders .ResourceControllerDataProvider ;
38+ import org .apache .helix .controller .rebalancer .util .WagedValidationUtil ;
3639import org .apache .helix .controller .rebalancer .waged .ReadOnlyWagedRebalancer ;
3740import org .apache .helix .controller .stages .AttributeName ;
3841import org .apache .helix .controller .stages .BestPossibleStateCalcStage ;
@@ -274,8 +277,21 @@ protected synchronized boolean verifyState() {
274277
275278 // Filter resources if requested
276279 if (_resources != null && !_resources .isEmpty ()) {
277- idealStates .keySet ().retainAll (_resources );
278- extViews .keySet ().retainAll (_resources );
280+ // Find if there are waged-enabled resources among the requested resources
281+ boolean hasRequestedWagedResources = _resources .stream ().anyMatch (
282+ resourceEntry -> WagedValidationUtil .isWagedEnabled (idealStates .get (resourceEntry )));
283+ Set <String > resourcesToRetain = new HashSet <>(_resources );
284+
285+ if (hasRequestedWagedResources ) {
286+ // If waged-enabled resources are found, retain all the waged-enabled resources and the
287+ // user requested resources.
288+ resourcesToRetain .addAll (idealStates .keySet ().stream ().filter (
289+ resourceEntry -> WagedValidationUtil .isWagedEnabled (idealStates .get (resourceEntry )))
290+ .collect (Collectors .toSet ()));
291+ }
292+
293+ idealStates .keySet ().retainAll (resourcesToRetain );
294+ extViews .keySet ().retainAll (resourcesToRetain );
279295 }
280296
281297 // if externalView is not empty and idealState doesn't exist
@@ -290,7 +306,7 @@ protected synchronized boolean verifyState() {
290306 }
291307
292308 // calculate best possible state
293- BestPossibleStateOutput bestPossOutput = calcBestPossState (_dataProvider , _resources );
309+ BestPossibleStateOutput bestPossOutput = calcBestPossState (_dataProvider , idealStates );
294310 Map <String , Map <Partition , Map <String , String >>> bestPossStateMap =
295311 bestPossOutput .getStateMap ();
296312
@@ -408,26 +424,26 @@ private Map<String, Map<String, String>> convertBestPossibleState(
408424 * kick off the BestPossibleStateCalcStage we are providing an empty current state map
409425 *
410426 * @param cache
411- * @param resources
427+ * @param resourceToIdealStateMap
412428 * @return
413429 * @throws Exception
414430 */
415- private BestPossibleStateOutput calcBestPossState (ResourceControllerDataProvider cache , Set < String > resources )
416- throws Exception {
431+ private BestPossibleStateOutput calcBestPossState (ResourceControllerDataProvider cache ,
432+ Map < String , IdealState > resourceToIdealStateMap ) throws Exception {
417433 ClusterEvent event = new ClusterEvent (_clusterName , ClusterEventType .StateVerifier );
418434 event .addAttribute (AttributeName .ControllerDataProvider .name (), cache );
419435
420436 RebalanceUtil .runStage (event , new ResourceComputationStage ());
421437
422- if (resources != null && !resources .isEmpty ()) {
438+ if (resourceToIdealStateMap != null && !resourceToIdealStateMap .isEmpty ()) {
423439 // Filtering out all non-required resources
424440 final Map <String , Resource > resourceMap = event .getAttribute (AttributeName .RESOURCES .name ());
425- resourceMap .keySet ().retainAll (resources );
441+ resourceMap .keySet ().retainAll (resourceToIdealStateMap . keySet () );
426442 event .addAttribute (AttributeName .RESOURCES .name (), resourceMap );
427443
428444 final Map <String , Resource > resourceMapToRebalance =
429445 event .getAttribute (AttributeName .RESOURCES_TO_REBALANCE .name ());
430- resourceMapToRebalance .keySet ().retainAll (resources );
446+ resourceMapToRebalance .keySet ().retainAll (resourceToIdealStateMap . keySet () );
431447 event .addAttribute (AttributeName .RESOURCES_TO_REBALANCE .name (), resourceMapToRebalance );
432448 }
433449
0 commit comments