Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Map<String, ResourceAssignment> getBaselineAssignment(AssignmentMetadataS
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline());
currentBaseline = assignmentMetadataStore.getBaseline();
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException(
Expand Down Expand Up @@ -88,10 +88,7 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment(
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBestAssignment =
assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
;
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.HashMap;
import java.util.Map;

import java.util.stream.Collectors;
import java.util.Objects;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
Expand All @@ -50,8 +50,8 @@ public class AssignmentMetadataStore {
private final String _baselinePath;
private final String _bestPossiblePath;
// volatile for double-checked locking
protected volatile Map<String, ResourceAssignment> _globalBaseline;
protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment;
protected volatile Map<String, ResourceAssignment> _globalBaseline = null;
protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment = null;
protected volatile int _bestPossibleVersion = 0;
protected volatile int _lastPersistedBestPossibleVersion = 0;

Expand All @@ -65,6 +65,9 @@ protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
}

/**
* @return a deep copy of the best possible assignment that is safe for modification.
*/
public Map<String, ResourceAssignment> getBaseline() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_globalBaseline == null) {
Expand All @@ -75,7 +78,11 @@ public Map<String, ResourceAssignment> getBaseline() {
}
}
}
return _globalBaseline;
Map<String, ResourceAssignment> result = new HashMap<>(_globalBaseline.size());
for (Map.Entry<String, ResourceAssignment> entry : _globalBaseline.entrySet()) {
result.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
}
return result;
}

/**
Expand All @@ -88,6 +95,9 @@ protected boolean hasPersistedLatestBestPossibleAssignment() {
return _lastPersistedBestPossibleVersion == _bestPossibleVersion;
}

/**
* @return a deep copy of the best possible assignment that is safe for modification.
*/
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_bestPossibleAssignment == null) {
Expand All @@ -98,7 +108,12 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
}
}
}
return _bestPossibleAssignment;
// Return defensive copy so that the in-memory assignment is not modified by callers
Map<String, ResourceAssignment> result = new HashMap<>(_bestPossibleAssignment);
for (Map.Entry<String, ResourceAssignment> entry : _bestPossibleAssignment.entrySet()) {
result.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
}
Comment on lines +112 to +115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_bestPossibleAssignment.size()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this logic I have seen for multiple lines. Please make them in a function to be called in the place similar.

return result;
}

private Map<String, ResourceAssignment> fetchAssignmentOrDefault(String path) {
Expand Down Expand Up @@ -135,14 +150,15 @@ private void persistAssignmentToMetadataStore(Map<String, ResourceAssignment> ne
*/
public synchronized void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// Create defensive copy so that the in-memory assignment is not modified after it is persisted
Map<String, ResourceAssignment> baselineCopy = globalBaseline.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
Map<String, ResourceAssignment> baselineCopy = new HashMap<>(globalBaseline.size());
for (Map.Entry<String, ResourceAssignment> entry : globalBaseline.entrySet()) {
baselineCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
}

// write to metadata store
persistAssignmentToMetadataStore(baselineCopy, _baselinePath, BASELINE_KEY);
// write to memory
getBaseline().clear();
getBaseline().putAll(baselineCopy);
_globalBaseline = baselineCopy;
}

/**
Expand All @@ -152,14 +168,14 @@ public synchronized void persistBaseline(Map<String, ResourceAssignment> globalB
*/
public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
// Create defensive copy so that the in-memory assignment is not modified after it is persisted
Map<String, ResourceAssignment> bestPossibleAssignmentCopy = bestPossibleAssignment.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
Map<String, ResourceAssignment> bestPossibleAssignmentCopy = new HashMap<>(bestPossibleAssignment.size());
for (Map.Entry<String, ResourceAssignment> entry : bestPossibleAssignment.entrySet()) {
bestPossibleAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
}
// write to metadata store
persistAssignmentToMetadataStore(bestPossibleAssignmentCopy, _bestPossiblePath, BEST_POSSIBLE_KEY);
// write to memory
getBestPossibleAssignment().clear();
getBestPossibleAssignment().putAll(bestPossibleAssignmentCopy);
_bestPossibleAssignment = bestPossibleAssignmentCopy;
_bestPossibleVersion++;
_lastPersistedBestPossibleVersion = _bestPossibleVersion;
}
Expand All @@ -173,10 +189,13 @@ public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssig
*/
public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
Map<String, ResourceAssignment> bestPossibleAssignmentCopy = new HashMap<>(bestPossibleAssignment.size());
for (Map.Entry<String, ResourceAssignment> entry : bestPossibleAssignment.entrySet()) {
bestPossibleAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
}
// Check if the version is stale by this point
if (newVersion > _bestPossibleVersion) {
getBestPossibleAssignment().clear();
getBestPossibleAssignment().putAll(bestPossibleAssignment);
_bestPossibleAssignment = bestPossibleAssignmentCopy;
_bestPossibleVersion = newVersion;
return true;
}
Expand All @@ -191,19 +210,13 @@ public int getBestPossibleVersion() {
public synchronized void clearAssignmentMetadata() {
persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath, BASELINE_KEY);
persistAssignmentToMetadataStore(Collections.emptyMap(), _bestPossiblePath, BEST_POSSIBLE_KEY);
getBaseline().clear();
getBestPossibleAssignment().clear();
_globalBaseline = new HashMap<>();
_bestPossibleAssignment = new HashMap<>();
}

protected synchronized void reset() {
if (_bestPossibleAssignment != null) {
_bestPossibleAssignment.clear();
_bestPossibleAssignment = null;
}
if (_globalBaseline != null) {
_globalBaseline.clear();
_globalBaseline = null;
}
_bestPossibleAssignment = null;
_globalBaseline = null;
}

protected void finalize() {
Expand Down Expand Up @@ -248,10 +261,10 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
}

protected boolean isBaselineChanged(Map<String, ResourceAssignment> newBaseline) {
return !getBaseline().equals(newBaseline);
return !Objects.equals(_globalBaseline, newBaseline);
}

protected boolean isBestPossibleChanged(Map<String, ResourceAssignment> newBestPossible) {
return !getBestPossibleAssignment().equals(newBestPossible);
return !Objects.equals(_bestPossibleAssignment, newBestPossible);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* under the License.
*/

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.BucketDataAccessor;
Expand All @@ -36,15 +36,15 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
}

public Map<String, ResourceAssignment> getBaseline() {
return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
return _globalBaseline == null ? new HashMap<>() : _globalBaseline;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
_globalBaseline = globalBaseline;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment == null ? Collections.emptyMap() : _bestPossibleAssignment;
return _bestPossibleAssignment == null ? new HashMap<>() : _bestPossibleAssignment;
}

public void persistBestPossibleAssignment(
Expand Down