Skip to content

Commit 55cda99

Browse files
sodonnelCyrill
authored andcommitted
HDDS-12127. RM should not expire pending deletes, but retry until delete is confirmed or node is dead (apache#7746)
1 parent f2987f7 commit 55cda99

File tree

10 files changed

+687
-513
lines changed

10 files changed

+687
-513
lines changed

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdds.scm.container.replication;
1919

2020
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
21+
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
2122

2223
/**
2324
* Class to wrap details used to track pending replications.
@@ -34,19 +35,20 @@ public enum PendingOpType {
3435
private final PendingOpType opType;
3536
private final DatanodeDetails target;
3637
private final int replicaIndex;
38+
private final SCMCommand<?> command;
3739
private final long deadlineEpochMillis;
3840

3941
public static ContainerReplicaOp create(PendingOpType opType,
4042
DatanodeDetails target, int replicaIndex) {
41-
return new ContainerReplicaOp(opType, target, replicaIndex,
42-
System.currentTimeMillis());
43+
return new ContainerReplicaOp(opType, target, replicaIndex, null, System.currentTimeMillis());
4344
}
4445

4546
public ContainerReplicaOp(PendingOpType opType,
46-
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
47+
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
4748
this.opType = opType;
4849
this.target = target;
4950
this.replicaIndex = replicaIndex;
51+
this.command = command;
5052
this.deadlineEpochMillis = deadlineEpochMillis;
5153
}
5254

@@ -62,6 +64,10 @@ public int getReplicaIndex() {
6264
return replicaIndex;
6365
}
6466

67+
public SCMCommand<?> getCommand() {
68+
return command;
69+
}
70+
6571
public long getDeadlineEpochMillis() {
6672
return deadlineEpochMillis;
6773
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.hdds.client.ReplicationType;
2222
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2323
import org.apache.hadoop.hdds.scm.container.ContainerID;
24+
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
2425

2526
import java.time.Clock;
2627
import java.util.ArrayList;
@@ -116,28 +117,30 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
116117
* Store a ContainerReplicaOp to add a replica for the given ContainerID.
117118
* @param containerID ContainerID for which to add a replica
118119
* @param target The target datanode
119-
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
120+
* @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
121+
* @param command The command to send to the datanode
120122
* @param deadlineEpochMillis The time by which the replica should have been
121123
* added and reported by the datanode, or it will
122124
* be discarded.
123125
*/
124126
public void scheduleAddReplica(ContainerID containerID,
125-
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
126-
addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis);
127+
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
128+
addReplica(ADD, containerID, target, replicaIndex, command, deadlineEpochMillis);
127129
}
128130

129131
/**
130132
* Store a ContainerReplicaOp to delete a replica for the given ContainerID.
131133
* @param containerID ContainerID for which to delete a replica
132134
* @param target The target datanode
133-
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
135+
* @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
136+
* @param command The command to send to the datanode
134137
* @param deadlineEpochMillis The time by which the replica should have been
135138
* deleted and reported by the datanode, or it will
136139
* be discarded.
137140
*/
138141
public void scheduleDeleteReplica(ContainerID containerID,
139-
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
140-
addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis);
142+
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
143+
addReplica(DELETE, containerID, target, replicaIndex, command, deadlineEpochMillis);
141144
}
142145

143146
/**
@@ -150,7 +153,7 @@ public void scheduleDeleteReplica(ContainerID containerID,
150153
*/
151154
public boolean completeAddReplica(ContainerID containerID,
152155
DatanodeDetails target, int replicaIndex) {
153-
boolean completed = completeOp(ADD, containerID, target, replicaIndex);
156+
boolean completed = completeOp(ADD, containerID, target, replicaIndex, true);
154157
if (isMetricsNotNull() && completed) {
155158
if (isEC(replicaIndex)) {
156159
replicationMetrics.incrEcReplicasCreatedTotal();
@@ -172,7 +175,7 @@ public boolean completeAddReplica(ContainerID containerID,
172175
*/
173176
public boolean completeDeleteReplica(ContainerID containerID,
174177
DatanodeDetails target, int replicaIndex) {
175-
boolean completed = completeOp(DELETE, containerID, target, replicaIndex);
178+
boolean completed = completeOp(DELETE, containerID, target, replicaIndex, true);
176179
if (isMetricsNotNull() && completed) {
177180
if (isEC(replicaIndex)) {
178181
replicationMetrics.incrEcReplicasDeletedTotal();
@@ -192,7 +195,7 @@ public boolean completeDeleteReplica(ContainerID containerID,
192195
public boolean removeOp(ContainerID containerID,
193196
ContainerReplicaOp op) {
194197
return completeOp(op.getOpType(), containerID, op.getTarget(),
195-
op.getReplicaIndex());
198+
op.getReplicaIndex(), true);
196199
}
197200

198201
/**
@@ -221,9 +224,13 @@ public void removeExpiredEntries() {
221224
while (iterator.hasNext()) {
222225
ContainerReplicaOp op = iterator.next();
223226
if (clock.millis() > op.getDeadlineEpochMillis()) {
224-
iterator.remove();
227+
if (op.getOpType() != DELETE) {
228+
// For delete ops, we don't remove them from the list as RM must resend them, or they
229+
// will be removed via a container report when they are confirmed as deleted.
230+
iterator.remove();
231+
decrementCounter(op.getOpType(), op.getReplicaIndex());
232+
}
225233
expiredOps.add(op);
226-
decrementCounter(op.getOpType(), op.getReplicaIndex());
227234
updateTimeoutMetrics(op);
228235
}
229236
}
@@ -258,23 +265,26 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
258265
}
259266

260267
private void addReplica(ContainerReplicaOp.PendingOpType opType,
261-
ContainerID containerID, DatanodeDetails target, int replicaIndex,
268+
ContainerID containerID, DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
262269
long deadlineEpochMillis) {
263270
Lock lock = writeLock(containerID);
264271
lock(lock);
265272
try {
273+
// Remove any existing duplicate op for the same target and replicaIndex before adding
274+
// the new one. Especially for delete ops, they could be getting resent after expiry.
275+
completeOp(opType, containerID, target, replicaIndex, false);
266276
List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
267277
containerID, s -> new ArrayList<>());
268278
ops.add(new ContainerReplicaOp(opType,
269-
target, replicaIndex, deadlineEpochMillis));
279+
target, replicaIndex, command, deadlineEpochMillis));
270280
incrementCounter(opType, replicaIndex);
271281
} finally {
272282
unlock(lock);
273283
}
274284
}
275285

276286
private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
277-
ContainerID containerID, DatanodeDetails target, int replicaIndex) {
287+
ContainerID containerID, DatanodeDetails target, int replicaIndex, boolean notifySubsribers) {
278288
boolean found = false;
279289
// List of completed ops that subscribers will be notified about
280290
List<ContainerReplicaOp> completedOps = new ArrayList<>();
@@ -303,7 +313,7 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
303313
unlock(lock);
304314
}
305315

306-
if (found) {
316+
if (found && notifySubsribers) {
307317
notifySubscribers(completedOps, containerID, false);
308318
}
309319
return found;

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ private void createReplicateCommand(
619619
private void adjustPendingOps(ECContainerReplicaCount replicaCount,
620620
DatanodeDetails target, int replicaIndex) {
621621
replicaCount.addPendingOp(new ContainerReplicaOp(
622-
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
622+
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null,
623623
Long.MAX_VALUE));
624624
}
625625

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
* that the containers are properly replicated. Replication Manager deals only
107107
* with Quasi Closed / Closed container.
108108
*/
109-
public class ReplicationManager implements SCMService {
109+
public class ReplicationManager implements SCMService, ContainerReplicaPendingOpsSubscriber {
110110

111111
public static final Logger LOG =
112112
LoggerFactory.getLogger(ReplicationManager.class);
@@ -700,8 +700,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
700700
if (cmd.getType() == Type.deleteContainerCommand) {
701701
DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
702702
containerReplicaPendingOps.scheduleDeleteReplica(
703-
containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(),
704-
scmDeadlineEpochMs);
703+
containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
705704
if (rcc.getReplicaIndex() > 0) {
706705
getMetrics().incrEcDeletionCmdsSentTotal();
707706
} else if (rcc.getReplicaIndex() == 0) {
@@ -714,7 +713,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
714713
byte[] targetIndexes = rcc.getMissingContainerIndexes();
715714
for (int i = 0; i < targetIndexes.length; i++) {
716715
containerReplicaPendingOps.scheduleAddReplica(
717-
containerInfo.containerID(), targets.get(i), targetIndexes[i],
716+
containerInfo.containerID(), targets.get(i), targetIndexes[i], cmd,
718717
scmDeadlineEpochMs);
719718
}
720719
getMetrics().incrEcReconstructionCmdsSentTotal();
@@ -729,15 +728,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
729728
*/
730729
containerReplicaPendingOps.scheduleAddReplica(
731730
containerInfo.containerID(),
732-
targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs);
731+
targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
733732
} else {
734733
/*
735734
This means the source will push replica to the target, so the op's
736735
target Datanode should be the Datanode the replica will be pushed to.
737736
*/
738737
containerReplicaPendingOps.scheduleAddReplica(
739738
containerInfo.containerID(),
740-
rcc.getTargetDatanode(), rcc.getReplicaIndex(), scmDeadlineEpochMs);
739+
rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
741740
}
742741

743742
if (rcc.getReplicaIndex() > 0) {
@@ -1089,6 +1088,27 @@ ReplicationQueue getQueue() {
10891088
return replicationQueue.get();
10901089
}
10911090

1091+
@Override
1092+
public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) {
1093+
if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) {
1094+
// We only care about expired delete ops. All others should be ignored.
1095+
return;
1096+
}
1097+
try {
1098+
ContainerInfo containerInfo = containerManager.getContainer(containerID);
1099+
// Sending the command in this way is un-throttled, and the command will have its deadline
1100+
// adjusted to a new deadline as part of the sending process.
1101+
sendDatanodeCommand(op.getCommand(), containerInfo, op.getTarget());
1102+
} catch (ContainerNotFoundException e) {
1103+
// Should not happen, as even deleted containers are currently retained in the SCM container map
1104+
LOG.error("Container {} not found when processing expired delete", containerID, e);
1105+
} catch (NotLeaderException e) {
1106+
// If SCM leadership has changed, this is fine to ignore. All pending ops will be expired
1107+
// once SCM leadership switches.
1108+
LOG.warn("SCM is not leader when processing expired delete", e);
1109+
}
1110+
}
1111+
10921112
/**
10931113
* Configuration used by the Replication Manager.
10941114
*/

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,9 @@ private void initializeSystemManagers(OzoneConfiguration conf,
836836
reconfigurationHandler.register(replicationManager.getConfig());
837837
}
838838
serviceManager.register(replicationManager);
839+
// RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it
840+
// so it can resend them.
841+
containerReplicaPendingOps.registerSubscriber(replicationManager);
839842
if (configurator.getScmSafeModeManager() != null) {
840843
scmSafeModeManager = configurator.getScmSafeModeManager();
841844
} else {

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,13 @@ public void testMovePendingOpsExist() throws Exception {
195195
nodes.put(src, NodeStatus.inServiceHealthy());
196196
nodes.put(tgt, NodeStatus.inServiceHealthy());
197197

198-
pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, clock.millis()));
198+
pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, null, clock.millis()));
199199

200200
assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_REPLICATION,
201201
containerInfo.containerID());
202202

203203
pendingOps.clear();
204-
pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, clock.millis()));
204+
pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, null, clock.millis()));
205205
assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_DELETION,
206206
containerInfo.containerID());
207207
}
@@ -321,7 +321,7 @@ public void testDeleteCommandFails() throws Exception {
321321
.when(containerManager).getContainer(any(ContainerID.class));
322322

323323
ContainerReplicaOp op = new ContainerReplicaOp(
324-
ADD, tgt, 0, clock.millis() + 1000);
324+
ADD, tgt, 0, null, clock.millis() + 1000);
325325
moveManager.opCompleted(op, containerInfo.containerID(), false);
326326

327327
MoveManager.MoveResult moveResult = res.get();
@@ -333,14 +333,14 @@ public void testSuccessfulMove() throws Exception {
333333
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
334334

335335
ContainerReplicaOp op = new ContainerReplicaOp(
336-
ADD, tgt, 0, clock.millis() + 1000);
336+
ADD, tgt, 0, null, clock.millis() + 1000);
337337
moveManager.opCompleted(op, containerInfo.containerID(), false);
338338

339339
Mockito.verify(replicationManager).sendDeleteCommand(
340340
eq(containerInfo), eq(0), eq(src), eq(true), anyLong());
341341

342342
op = new ContainerReplicaOp(
343-
DELETE, src, 0, clock.millis() + 1000);
343+
DELETE, src, 0, null, clock.millis() + 1000);
344344
moveManager.opCompleted(op, containerInfo.containerID(), false);
345345

346346
MoveManager.MoveResult finalResult = res.get();
@@ -370,15 +370,15 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception {
370370
anyLong());
371371

372372
ContainerReplicaOp op = new ContainerReplicaOp(
373-
ADD, tgt, srcReplica.getReplicaIndex(), clock.millis() + 1000);
373+
ADD, tgt, srcReplica.getReplicaIndex(), null, clock.millis() + 1000);
374374
moveManager.opCompleted(op, containerInfo.containerID(), false);
375375

376376
Mockito.verify(replicationManager).sendDeleteCommand(
377377
eq(containerInfo), eq(srcReplica.getReplicaIndex()), eq(src),
378378
eq(true), anyLong());
379379

380380
op = new ContainerReplicaOp(
381-
DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000);
381+
DELETE, src, srcReplica.getReplicaIndex(), null, clock.millis() + 1000);
382382
moveManager.opCompleted(op, containerInfo.containerID(), false);
383383

384384
MoveManager.MoveResult finalResult = res.get();
@@ -390,7 +390,7 @@ public void testMoveTimeoutOnAdd() throws Exception {
390390
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
391391

392392
ContainerReplicaOp op = new ContainerReplicaOp(
393-
ADD, tgt, 0, clock.millis() + 1000);
393+
ADD, tgt, 0, null, clock.millis() + 1000);
394394
moveManager.opCompleted(op, containerInfo.containerID(), true);
395395

396396
MoveManager.MoveResult finalResult = res.get();
@@ -402,14 +402,14 @@ public void testMoveTimeoutOnDelete() throws Exception {
402402
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
403403

404404
ContainerReplicaOp op = new ContainerReplicaOp(
405-
ADD, tgt, 0, clock.millis() + 1000);
405+
ADD, tgt, 0, null, clock.millis() + 1000);
406406
moveManager.opCompleted(op, containerInfo.containerID(), false);
407407

408408
Mockito.verify(replicationManager).sendDeleteCommand(
409409
eq(containerInfo), eq(0), eq(src), eq(true), anyLong());
410410

411411
op = new ContainerReplicaOp(
412-
DELETE, src, 0, clock.millis() + 1000);
412+
DELETE, src, 0, null, clock.millis() + 1000);
413413
moveManager.opCompleted(op, containerInfo.containerID(), true);
414414

415415
MoveManager.MoveResult finalResult = res.get();
@@ -430,7 +430,7 @@ public void testMoveCompleteSrcNoLongerPresent() throws Exception {
430430
}
431431
}
432432
ContainerReplicaOp op = new ContainerReplicaOp(
433-
ADD, tgt, 0, clock.millis() + 1000);
433+
ADD, tgt, 0, null, clock.millis() + 1000);
434434
moveManager.opCompleted(op, containerInfo.containerID(), false);
435435

436436
MoveManager.MoveResult finalResult = res.get();
@@ -446,7 +446,7 @@ public void testMoveCompleteSrcNotHealthy() throws Exception {
446446

447447
nodes.put(src, NodeStatus.inServiceStale());
448448
ContainerReplicaOp op = new ContainerReplicaOp(
449-
ADD, tgt, 0, clock.millis() + 1000);
449+
ADD, tgt, 0, null, clock.millis() + 1000);
450450
moveManager.opCompleted(op, containerInfo.containerID(), false);
451451

452452
MoveManager.MoveResult finalResult = res.get();
@@ -464,7 +464,7 @@ public void testMoveCompleteSrcNotInService() throws Exception {
464464
HddsProtos.NodeOperationalState.DECOMMISSIONING,
465465
HddsProtos.NodeState.HEALTHY));
466466
ContainerReplicaOp op = new ContainerReplicaOp(
467-
ADD, tgt, 0, clock.millis() + 1000);
467+
ADD, tgt, 0, null, clock.millis() + 1000);
468468
moveManager.opCompleted(op, containerInfo.containerID(), false);
469469

470470
MoveManager.MoveResult finalResult = res.get();
@@ -483,7 +483,7 @@ public void testMoveCompleteFutureReplicasUnhealthy() throws Exception {
483483
.MisReplicatedHealthResult(containerInfo, false, null));
484484

485485
ContainerReplicaOp op = new ContainerReplicaOp(
486-
ADD, tgt, 0, clock.millis() + 1000);
486+
ADD, tgt, 0, null, clock.millis() + 1000);
487487
moveManager.opCompleted(op, containerInfo.containerID(), false);
488488

489489
MoveManager.MoveResult finalResult = res.get();

0 commit comments

Comments
 (0)