Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Map<String, ResourceAssignment> getBaselineAssignment(AssignmentMetadataS
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline());
currentBaseline = assignmentMetadataStore.getBaseline();
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException(
Expand Down Expand Up @@ -88,10 +88,7 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment(
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBestAssignment =
assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
;
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;

import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
Expand All @@ -50,8 +51,8 @@ public class AssignmentMetadataStore {
private final String _baselinePath;
private final String _bestPossiblePath;
// volatile for double-checked locking
protected volatile Map<String, ResourceAssignment> _globalBaseline;
protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment;
protected volatile Map<String, ResourceAssignment> _globalBaseline = null;
protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment = null;
protected volatile int _bestPossibleVersion = 0;
protected volatile int _lastPersistedBestPossibleVersion = 0;

Expand All @@ -65,6 +66,9 @@ protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
}

/**
* @return a deep copy of the best possible assignment that is safe for modification.
*/
public Map<String, ResourceAssignment> getBaseline() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_globalBaseline == null) {
Expand All @@ -75,7 +79,9 @@ public Map<String, ResourceAssignment> getBaseline() {
}
}
}
return _globalBaseline;
return _globalBaseline.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try not implementing lambda function in core piece. It hurts the performance.

}

/**
Expand All @@ -88,6 +94,9 @@ protected boolean hasPersistedLatestBestPossibleAssignment() {
return _lastPersistedBestPossibleVersion == _bestPossibleVersion;
}

/**
* @return a deep copy of the best possible assignment that is safe for modification.
*/
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_bestPossibleAssignment == null) {
Expand All @@ -98,7 +107,10 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
}
}
}
return _bestPossibleAssignment;
// Return defensive copy so that the in-memory assignment is not modified by callers
return _bestPossibleAssignment.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
}

private Map<String, ResourceAssignment> fetchAssignmentOrDefault(String path) {
Expand Down Expand Up @@ -141,8 +153,7 @@ public synchronized void persistBaseline(Map<String, ResourceAssignment> globalB
// write to metadata store
persistAssignmentToMetadataStore(baselineCopy, _baselinePath, BASELINE_KEY);
// write to memory
getBaseline().clear();
getBaseline().putAll(baselineCopy);
_globalBaseline = baselineCopy;
}

/**
Expand All @@ -158,8 +169,7 @@ public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssig
// write to metadata store
persistAssignmentToMetadataStore(bestPossibleAssignmentCopy, _bestPossiblePath, BEST_POSSIBLE_KEY);
// write to memory
getBestPossibleAssignment().clear();
getBestPossibleAssignment().putAll(bestPossibleAssignmentCopy);
_bestPossibleAssignment = bestPossibleAssignmentCopy;
_bestPossibleVersion++;
_lastPersistedBestPossibleVersion = _bestPossibleVersion;
}
Expand All @@ -173,10 +183,12 @@ public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssig
*/
public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
Map<String, ResourceAssignment> bestPossibleAssignmentCopy = bestPossibleAssignment.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
// Check if the version is stale by this point
if (newVersion > _bestPossibleVersion) {
getBestPossibleAssignment().clear();
getBestPossibleAssignment().putAll(bestPossibleAssignment);
_bestPossibleAssignment = bestPossibleAssignmentCopy;
_bestPossibleVersion = newVersion;
return true;
}
Expand All @@ -191,19 +203,13 @@ public int getBestPossibleVersion() {
public synchronized void clearAssignmentMetadata() {
persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath, BASELINE_KEY);
persistAssignmentToMetadataStore(Collections.emptyMap(), _bestPossiblePath, BEST_POSSIBLE_KEY);
getBaseline().clear();
getBestPossibleAssignment().clear();
_globalBaseline = new HashMap<>();
_bestPossibleAssignment = new HashMap<>();
}

protected synchronized void reset() {
if (_bestPossibleAssignment != null) {
_bestPossibleAssignment.clear();
_bestPossibleAssignment = null;
}
if (_globalBaseline != null) {
_globalBaseline.clear();
_globalBaseline = null;
}
_bestPossibleAssignment = null;
_globalBaseline = null;
}

protected void finalize() {
Expand Down Expand Up @@ -248,10 +254,10 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
}

protected boolean isBaselineChanged(Map<String, ResourceAssignment> newBaseline) {
return !getBaseline().equals(newBaseline);
return !Objects.equals(_globalBaseline, newBaseline);
}

protected boolean isBestPossibleChanged(Map<String, ResourceAssignment> newBestPossible) {
return !getBestPossibleAssignment().equals(newBestPossible);
return !Objects.equals(_bestPossibleAssignment, newBestPossible);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* under the License.
*/

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.BucketDataAccessor;
Expand All @@ -36,15 +36,15 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
}

public Map<String, ResourceAssignment> getBaseline() {
return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
return _globalBaseline == null ? new HashMap<>() : _globalBaseline;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
_globalBaseline = globalBaseline;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment == null ? Collections.emptyMap() : _bestPossibleAssignment;
return _bestPossibleAssignment == null ? new HashMap<>() : _bestPossibleAssignment;
}

public void persistBestPossibleAssignment(
Expand Down
Loading