Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,17 @@ public class ReplicationConfig {
@Config(BACKUP_CHECKER_REPORT_DIR)
public final int maxBackupCheckerReportFd;

@Config(REPLICATION_ENABLE_PRIORITIZATION)
@Default("false")
public final boolean replicationEnablePrioritzation;
public final static String REPLICATION_ENABLE_PRIORITIZATION = "replication.enable.prioritization";

@Config(REPLICATION_MAX_PRIORITIZED_REPLICAS_PERCENT)
@Default("100")
public final int replicationMaxPrioritizedReplicasPercent;
public final static String REPLICATION_MAX_PRIORITIZED_REPLICAS_PERCENT = "replication.max.prioritized.replicas.percent";


public ReplicationConfig(VerifiableProperties verifiableProperties) {

maxReplicationRetryCount =
Expand Down Expand Up @@ -428,5 +439,7 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getBoolean(REPLICATION_USING_NONBLOCKING_NETWORK_CLIENT_FOR_REMOTE_COLO, false);
replicationUsingNonblockingNetworkClientForLocalColo =
verifiableProperties.getBoolean(REPLICATION_USING_NONBLOCKING_NETWORK_CLIENT_FOR_LOCAL_COLO, false);
replicationEnablePrioritzation = verifiableProperties.getBoolean(REPLICATION_ENABLE_PRIORITIZATION, false);
replicationMaxPrioritizedReplicasPercent = verifiableProperties.getInt(REPLICATION_MAX_PRIORITIZED_REPLICAS_PERCENT, 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,20 @@ public void replicateCyclic() {
standbyReplicasTimedOutOnNoProgress);

if (activeReplicasPerNode.size() > 0) {

// maxReplicaCountPerRequest = 20
List<List<RemoteReplicaInfo>> activeReplicaSubLists =
maxReplicaCountPerRequest > 0 ? Utils.partitionList(activeReplicasPerNode, maxReplicaCountPerRequest)
: Collections.singletonList(activeReplicasPerNode);
for (List<RemoteReplicaInfo> replicaSubList : activeReplicaSubLists) {
int size = replicaSubList.size();
if (replicationConfig.replicationEnablePrioritzation) {
int maxSize = replicationConfig.replicationMaxPrioritizedReplicasPercent/100 * size;
size = Math.min(size, maxSize);
}

RemoteReplicaGroup group =
new RemoteReplicaGroup(replicaSubList, remoteNode, false, remoteReplicaGroupId++);
new RemoteReplicaGroup(replicaSubList.subList(0, size), remoteNode, false, remoteReplicaGroupId++);
remoteReplicaGroups.add(group);
}
}
Expand Down Expand Up @@ -2226,7 +2234,7 @@ private void fillDataNodeTrackers() {

DataNodeTracker dataNodeTracker =
new DataNodeTracker(remoteHost, remoteReplicasPerNode, maxReplicaCountPerRequest, currentStartGroupId, time,
threadThrottleDurationMs);
threadThrottleDurationMs, replicationConfig.replicationEnablePrioritzation, replicationConfig.replicationMaxPrioritizedReplicasPercent);
logger.trace("Thread name: {} for datanode {} create datanode tracker {}", threadName, remoteHost,
dataNodeTracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DataNodeTracker {
* @param replicaThrottleDurationMs throttle duration for replicas
*/
public DataNodeTracker(DataNodeId dataNodeId, List<RemoteReplicaInfo> remoteReplicas, int maxActiveGroupSize,
int startGroupId, Time time, long replicaThrottleDurationMs) {
int startGroupId, Time time, long replicaThrottleDurationMs, boolean isReplicaPrioritzationEnabled, int replicationMaxPrioritizedReplicas) {
this.dataNodeId = dataNodeId;
this.activeGroupTrackers = new ArrayList<>();

Expand All @@ -61,7 +61,13 @@ public DataNodeTracker(DataNodeId dataNodeId, List<RemoteReplicaInfo> remoteRepl

// for each of smaller array of remote replicas create active group trackers with consecutive group ids
for (List<RemoteReplicaInfo> remoteReplicaList : remoteReplicaSegregatedList) {
ActiveGroupTracker activeGroupTracker = new ActiveGroupTracker(currentGroupId, remoteReplicaList.stream()
int size = remoteReplicaList.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets extract this code piece so that its a single point where this logic resides. Currently this is being evaluated at 2 places and any change in one place must be replicated to the other location. Roughly :-

list foo(inList, bool IsReplicationEnablePrioritzation, replicationMaxPrioritizedReplicasPercent) {
    int size = inList.size();
    if (IsReplicationEnablePrioritzation) {
             int maxSize = replicationMaxPrioritizedReplicasPercent/100 * size;
             size = Math.min(size, maxSize);
   }
   return inList.subList(0, size)
}

if (isReplicaPrioritzationEnabled) {
int maxSize = replicationMaxPrioritizedReplicas/100 * size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will not be able to get accurate data from this methodology. As replicas for partition can be in different threads and different data node tracker. So you will stop one replica and not stop another replica. Also after each iteration, the list could change and different partition will be picked up.

size = Math.min(size, maxSize);
}

ActiveGroupTracker activeGroupTracker = new ActiveGroupTracker(currentGroupId, remoteReplicaList.subList(0, size).stream()
.map(remoteReplicaInfo -> new ReplicaTracker(remoteReplicaInfo, time, replicaThrottleDurationMs))
.collect(Collectors.toList()));
activeGroupTrackers.add(activeGroupTracker);
Expand Down
Loading