Skip to content

Commit 4c5da18

Browse files
sodonnelSlava Tutrinov
authored andcommitted
HDDS-12127. RM should not expire pending deletes, but retry until delete is confirmed or node is dead (apache#7746)
(cherry picked from commit 04f6255) Conflicts: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
1 parent 2c17910 commit 4c5da18

File tree

11 files changed

+491
-238
lines changed

11 files changed

+491
-238
lines changed

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdds.scm.container.replication;
1919

2020
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
21+
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
2122

2223
/**
2324
* Class to wrap details used to track pending replications.
@@ -34,19 +35,20 @@ public enum PendingOpType {
3435
private final PendingOpType opType;
3536
private final DatanodeDetails target;
3637
private final int replicaIndex;
38+
private final SCMCommand<?> command;
3739
private final long deadlineEpochMillis;
3840

3941
public static ContainerReplicaOp create(PendingOpType opType,
4042
DatanodeDetails target, int replicaIndex) {
41-
return new ContainerReplicaOp(opType, target, replicaIndex,
42-
System.currentTimeMillis());
43+
return new ContainerReplicaOp(opType, target, replicaIndex, null, System.currentTimeMillis());
4344
}
4445

4546
public ContainerReplicaOp(PendingOpType opType,
46-
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
47+
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
4748
this.opType = opType;
4849
this.target = target;
4950
this.replicaIndex = replicaIndex;
51+
this.command = command;
5052
this.deadlineEpochMillis = deadlineEpochMillis;
5153
}
5254

@@ -62,6 +64,10 @@ public int getReplicaIndex() {
6264
return replicaIndex;
6365
}
6466

67+
public SCMCommand<?> getCommand() {
68+
return command;
69+
}
70+
6571
public long getDeadlineEpochMillis() {
6672
return deadlineEpochMillis;
6773
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.hdds.client.ReplicationType;
2222
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2323
import org.apache.hadoop.hdds.scm.container.ContainerID;
24+
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
2425

2526
import java.time.Clock;
2627
import java.util.ArrayList;
@@ -116,28 +117,30 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
116117
* Store a ContainerReplicaOp to add a replica for the given ContainerID.
117118
* @param containerID ContainerID for which to add a replica
118119
* @param target The target datanode
119-
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
120+
* @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
121+
* @param command The command to send to the datanode
120122
* @param deadlineEpochMillis The time by which the replica should have been
121123
* added and reported by the datanode, or it will
122124
* be discarded.
123125
*/
124126
public void scheduleAddReplica(ContainerID containerID,
125-
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
126-
addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis);
127+
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
128+
addReplica(ADD, containerID, target, replicaIndex, command, deadlineEpochMillis);
127129
}
128130

129131
/**
130132
* Store a ContainerReplicaOp to delete a replica for the given ContainerID.
131133
* @param containerID ContainerID for which to delete a replica
132134
* @param target The target datanode
133-
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
135+
* @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
136+
* @param command The command to send to the datanode
134137
* @param deadlineEpochMillis The time by which the replica should have been
135138
* deleted and reported by the datanode, or it will
136139
* be discarded.
137140
*/
138141
public void scheduleDeleteReplica(ContainerID containerID,
139-
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
140-
addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis);
142+
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
143+
addReplica(DELETE, containerID, target, replicaIndex, command, deadlineEpochMillis);
141144
}
142145

143146
/**
@@ -150,7 +153,7 @@ public void scheduleDeleteReplica(ContainerID containerID,
150153
*/
151154
public boolean completeAddReplica(ContainerID containerID,
152155
DatanodeDetails target, int replicaIndex) {
153-
boolean completed = completeOp(ADD, containerID, target, replicaIndex);
156+
boolean completed = completeOp(ADD, containerID, target, replicaIndex, true);
154157
if (isMetricsNotNull() && completed) {
155158
if (isEC(replicaIndex)) {
156159
replicationMetrics.incrEcReplicasCreatedTotal();
@@ -172,7 +175,7 @@ public boolean completeAddReplica(ContainerID containerID,
172175
*/
173176
public boolean completeDeleteReplica(ContainerID containerID,
174177
DatanodeDetails target, int replicaIndex) {
175-
boolean completed = completeOp(DELETE, containerID, target, replicaIndex);
178+
boolean completed = completeOp(DELETE, containerID, target, replicaIndex, true);
176179
if (isMetricsNotNull() && completed) {
177180
if (isEC(replicaIndex)) {
178181
replicationMetrics.incrEcReplicasDeletedTotal();
@@ -192,7 +195,7 @@ public boolean completeDeleteReplica(ContainerID containerID,
192195
public boolean removeOp(ContainerID containerID,
193196
ContainerReplicaOp op) {
194197
return completeOp(op.getOpType(), containerID, op.getTarget(),
195-
op.getReplicaIndex());
198+
op.getReplicaIndex(), true);
196199
}
197200

198201
/**
@@ -221,9 +224,13 @@ public void removeExpiredEntries() {
221224
while (iterator.hasNext()) {
222225
ContainerReplicaOp op = iterator.next();
223226
if (clock.millis() > op.getDeadlineEpochMillis()) {
224-
iterator.remove();
227+
if (op.getOpType() != DELETE) {
228+
// For delete ops, we don't remove them from the list as RM must resend them, or they
229+
// will be removed via a container report when they are confirmed as deleted.
230+
iterator.remove();
231+
decrementCounter(op.getOpType(), op.getReplicaIndex());
232+
}
225233
expiredOps.add(op);
226-
decrementCounter(op.getOpType(), op.getReplicaIndex());
227234
updateTimeoutMetrics(op);
228235
}
229236
}
@@ -258,23 +265,26 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
258265
}
259266

260267
private void addReplica(ContainerReplicaOp.PendingOpType opType,
261-
ContainerID containerID, DatanodeDetails target, int replicaIndex,
268+
ContainerID containerID, DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
262269
long deadlineEpochMillis) {
263270
Lock lock = writeLock(containerID);
264271
lock(lock);
265272
try {
273+
// Remove any existing duplicate op for the same target and replicaIndex before adding
274+
// the new one. Especially for delete ops, they could be getting resent after expiry.
275+
completeOp(opType, containerID, target, replicaIndex, false);
266276
List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
267277
containerID, s -> new ArrayList<>());
268278
ops.add(new ContainerReplicaOp(opType,
269-
target, replicaIndex, deadlineEpochMillis));
279+
target, replicaIndex, command, deadlineEpochMillis));
270280
incrementCounter(opType, replicaIndex);
271281
} finally {
272282
unlock(lock);
273283
}
274284
}
275285

276286
private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
277-
ContainerID containerID, DatanodeDetails target, int replicaIndex) {
287+
ContainerID containerID, DatanodeDetails target, int replicaIndex, boolean notifySubsribers) {
278288
boolean found = false;
279289
// List of completed ops that subscribers will be notified about
280290
List<ContainerReplicaOp> completedOps = new ArrayList<>();
@@ -303,7 +313,7 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
303313
unlock(lock);
304314
}
305315

306-
if (found) {
316+
if (found && notifySubsribers) {
307317
notifySubscribers(completedOps, containerID, false);
308318
}
309319
return found;

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ private void createReplicateCommand(
619619
private void adjustPendingOps(ECContainerReplicaCount replicaCount,
620620
DatanodeDetails target, int replicaIndex) {
621621
replicaCount.addPendingOp(new ContainerReplicaOp(
622-
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
622+
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null,
623623
Long.MAX_VALUE));
624624
}
625625

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,14 +1406,14 @@ private List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
14061406
if (inflightActions != null) {
14071407
for (InflightAction a : inflightActions) {
14081408
pendingOps.add(new ContainerReplicaOp(
1409-
ADD, a.getDatanode(), 0, Long.MAX_VALUE));
1409+
ADD, a.getDatanode(), 0, null, Long.MAX_VALUE));
14101410
}
14111411
}
14121412
inflightActions = inflightDeletion.get(containerID);
14131413
if (inflightActions != null) {
14141414
for (InflightAction a : inflightActions) {
14151415
pendingOps.add(new ContainerReplicaOp(
1416-
DELETE, a.getDatanode(), 0, Long.MAX_VALUE));
1416+
DELETE, a.getDatanode(), 0, null, Long.MAX_VALUE));
14171417
}
14181418
}
14191419
return pendingOps;

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
* that the containers are properly replicated. Replication Manager deals only
107107
* with Quasi Closed / Closed container.
108108
*/
109-
public class ReplicationManager implements SCMService {
109+
public class ReplicationManager implements SCMService, ContainerReplicaPendingOpsSubscriber {
110110

111111
public static final Logger LOG =
112112
LoggerFactory.getLogger(ReplicationManager.class);
@@ -698,8 +698,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
698698
if (cmd.getType() == Type.deleteContainerCommand) {
699699
DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
700700
containerReplicaPendingOps.scheduleDeleteReplica(
701-
containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(),
702-
scmDeadlineEpochMs);
701+
containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
703702
if (rcc.getReplicaIndex() > 0) {
704703
getMetrics().incrEcDeletionCmdsSentTotal();
705704
} else if (rcc.getReplicaIndex() == 0) {
@@ -712,8 +711,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
712711
byte[] targetIndexes = rcc.getMissingContainerIndexes();
713712
for (int i = 0; i < targetIndexes.length; i++) {
714713
containerReplicaPendingOps.scheduleAddReplica(
715-
containerInfo.containerID(), targets.get(i), targetIndexes[i],
716-
scmDeadlineEpochMs);
714+
containerInfo.containerID(), targets.get(i), targetIndexes[i], cmd, scmDeadlineEpochMs);
717715
}
718716
getMetrics().incrEcReconstructionCmdsSentTotal();
719717
} else if (cmd.getType() == Type.replicateContainerCommand) {
@@ -727,15 +725,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
727725
*/
728726
containerReplicaPendingOps.scheduleAddReplica(
729727
containerInfo.containerID(),
730-
targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs);
728+
targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
731729
} else {
732730
/*
733731
This means the source will push replica to the target, so the op's
734732
target Datanode should be the Datanode the replica will be pushed to.
735733
*/
736734
containerReplicaPendingOps.scheduleAddReplica(
737735
containerInfo.containerID(),
738-
rcc.getTargetDatanode(), rcc.getReplicaIndex(), scmDeadlineEpochMs);
736+
rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
739737
}
740738

741739
if (rcc.getReplicaIndex() > 0) {
@@ -1079,6 +1077,27 @@ ReplicationQueue getQueue() {
10791077
return replicationQueue.get();
10801078
}
10811079

1080+
@Override
1081+
public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) {
1082+
if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) {
1083+
// We only care about expired delete ops. All others should be ignored.
1084+
return;
1085+
}
1086+
try {
1087+
ContainerInfo containerInfo = containerManager.getContainer(containerID);
1088+
// Sending the command in this way is un-throttled, and the command will have its deadline
1089+
// adjusted to a new deadline as part of the sending process.
1090+
sendDatanodeCommand(op.getCommand(), containerInfo, op.getTarget());
1091+
} catch (ContainerNotFoundException e) {
1092+
// Should not happen, as even deleted containers are currently retained in the SCM container map
1093+
LOG.error("Container {} not found when processing expired delete", containerID, e);
1094+
} catch (NotLeaderException e) {
1095+
// If SCM leadership has changed, this is fine to ignore. All pending ops will be expired
1096+
// once SCM leadership switches.
1097+
LOG.warn("SCM is not leader when processing expired delete", e);
1098+
}
1099+
}
1100+
10821101
/**
10831102
* Configuration used by the Replication Manager.
10841103
*/

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,9 @@ private void initializeSystemManagers(OzoneConfiguration conf,
836836
reconfigurationHandler.register(replicationManager.getConfig());
837837
}
838838
serviceManager.register(replicationManager);
839+
// RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it
840+
// so it can resend them.
841+
containerReplicaPendingOps.registerSubscriber(replicationManager);
839842
if (configurator.getScmSafeModeManager() != null) {
840843
scmSafeModeManager = configurator.getScmSafeModeManager();
841844
} else {

0 commit comments

Comments
 (0)