From 786daef1682564b9d3d4614b5561c1cbc9278f7c Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 22 Jan 2025 11:28:20 +0000 Subject: [PATCH 1/6] Overrwrite duplicate ops if another scheduled for same opType, target and index --- .../ContainerReplicaPendingOps.java | 13 ++++--- .../TestContainerReplicaPendingOps.java | 39 ++++++++++++++++++- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 4eef0a8a744c..84d186868860 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -150,7 +150,7 @@ public void scheduleDeleteReplica(ContainerID containerID, */ public boolean completeAddReplica(ContainerID containerID, DatanodeDetails target, int replicaIndex) { - boolean completed = completeOp(ADD, containerID, target, replicaIndex); + boolean completed = completeOp(ADD, containerID, target, replicaIndex, true); if (isMetricsNotNull() && completed) { if (isEC(replicaIndex)) { replicationMetrics.incrEcReplicasCreatedTotal(); @@ -172,7 +172,7 @@ public boolean completeAddReplica(ContainerID containerID, */ public boolean completeDeleteReplica(ContainerID containerID, DatanodeDetails target, int replicaIndex) { - boolean completed = completeOp(DELETE, containerID, target, replicaIndex); + boolean completed = completeOp(DELETE, containerID, target, replicaIndex, true); if (isMetricsNotNull() && completed) { if (isEC(replicaIndex)) { replicationMetrics.incrEcReplicasDeletedTotal(); @@ -192,7 +192,7 @@ public boolean completeDeleteReplica(ContainerID containerID, public boolean removeOp(ContainerID containerID, ContainerReplicaOp op) { return completeOp(op.getOpType(), containerID, op.getTarget(), - op.getReplicaIndex()); + op.getReplicaIndex(), true); } /** @@ -263,6 +263,9 @@ private void addReplica(ContainerReplicaOp.PendingOpType opType, Lock lock = writeLock(containerID); lock(lock); try { + // Remove any existing duplicate op for the same target and replicaIndex before adding + // the new one. Especially for delete ops, they could be getting resent after expiry. + completeOp(opType, containerID, target, replicaIndex, false); List ops = pendingOps.computeIfAbsent( containerID, s -> new ArrayList<>()); ops.add(new ContainerReplicaOp(opType, @@ -274,7 +277,7 @@ private void addReplica(ContainerReplicaOp.PendingOpType opType, } private boolean completeOp(ContainerReplicaOp.PendingOpType opType, - ContainerID containerID, DatanodeDetails target, int replicaIndex) { + ContainerID containerID, DatanodeDetails target, int replicaIndex, boolean notifySubsribers) { boolean found = false; // List of completed ops that subscribers will be notified about List completedOps = new ArrayList<>(); @@ -303,7 +306,7 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType opType, unlock(lock); } - if (found) { + if (found && notifySubsribers) { notifySubscribers(completedOps, containerID, false); } return found; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java index 3775531d30d1..df122682dfe1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java @@ -111,14 +111,27 @@ public void testCanAddReplicasForAdd() { pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline); pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, deadline); pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline); + // Duplicate for DN2 + pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, deadline + 1); + // Not a duplicate for DN2 as different index. Should not happen in practice as it is not valid to have 2 indexes + // on the same node. + pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 1, deadline); pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, deadline); + pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, deadline + 1); List ops = pendingOps.getPendingOps(new ContainerID(1)); - assertEquals(3, ops.size()); + assertEquals(4, ops.size()); for (ContainerReplicaOp op : ops) { - assertEquals(0, op.getReplicaIndex()); + if (!op.getTarget().equals(dn2)) { + assertEquals(0, op.getReplicaIndex()); + } assertEquals(ADD, op.getOpType()); + if (op.getTarget().equals(dn2) && op.getReplicaIndex() == 0) { + assertEquals(deadline + 1, op.getDeadlineEpochMillis()); + } else { + assertEquals(deadline, op.getDeadlineEpochMillis()); + } } List allDns = ops.stream() .map(ContainerReplicaOp::getTarget).collect(Collectors.toList()); @@ -131,6 +144,7 @@ public void testCanAddReplicasForAdd() { assertEquals(1, ops.get(0).getReplicaIndex()); assertEquals(ADD, ops.get(0).getOpType()); assertEquals(dn1, ops.get(0).getTarget()); + assertEquals(deadline + 1, ops.get(0).getDeadlineEpochMillis()); } @Test @@ -397,4 +411,25 @@ public void subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() { // subscriber verifyNoMoreInteractions(subscriber1); } + + @Test + public void subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() { + ContainerID containerID = new ContainerID(1); + + // schedule ops + pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline); + + // register subscriber + ContainerReplicaPendingOpsSubscriber subscriber1 = mock( + ContainerReplicaPendingOpsSubscriber.class); + pendingOps.registerSubscriber(subscriber1); + + clock.fastForward(1000); + pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline + 1); + // no entries have expired, so there should be zero interactions with the + // subscriber + verifyNoMoreInteractions(subscriber1); + } + + } From 6893400b1df8f36e95a76d0ca02e3ffb05d8daaf Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 22 Jan 2025 17:32:06 +0000 Subject: [PATCH 2/6] Add SCMCommand to ContainerReplicaOp and ContainerReplicaPendingOps --- .../replication/ContainerReplicaOp.java | 12 +- .../ContainerReplicaPendingOps.java | 18 ++- .../ECUnderReplicationHandler.java | 2 +- .../replication/ReplicationManager.java | 10 +- .../container/balancer/TestMoveManager.java | 28 ++--- .../TestContainerReplicaPendingOps.java | 104 ++++++++++-------- .../TestECContainerReplicaCount.java | 4 +- .../replication/TestReplicationManager.java | 8 +- .../TestReplicationManagerScenarios.java | 4 +- 9 files changed, 104 insertions(+), 86 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java index 34cc01eb8938..99fcda97b2ee 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; /** * Class to wrap details used to track pending replications. @@ -34,19 +35,20 @@ public enum PendingOpType { private final PendingOpType opType; private final DatanodeDetails target; private final int replicaIndex; + private final SCMCommand command; private final long deadlineEpochMillis; public static ContainerReplicaOp create(PendingOpType opType, DatanodeDetails target, int replicaIndex) { - return new ContainerReplicaOp(opType, target, replicaIndex, - System.currentTimeMillis()); + return new ContainerReplicaOp(opType, target, replicaIndex, null, System.currentTimeMillis()); } public ContainerReplicaOp(PendingOpType opType, - DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { + DatanodeDetails target, int replicaIndex, SCMCommand command, long deadlineEpochMillis) { this.opType = opType; this.target = target; this.replicaIndex = replicaIndex; + this.command = command; this.deadlineEpochMillis = deadlineEpochMillis; } @@ -62,6 +64,10 @@ public int getReplicaIndex() { return replicaIndex; } + public SCMCommand getCommand() { + return command; + } + public long getDeadlineEpochMillis() { return deadlineEpochMillis; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 84d186868860..0a2d1d57021b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -21,6 +21,10 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.jheaps.annotations.VisibleForTesting; import java.time.Clock; import java.util.ArrayList; @@ -117,13 +121,14 @@ public List getPendingOps(ContainerID containerID) { * @param containerID ContainerID for which to add a replica * @param target The target datanode * @param replicaIndex The replica index (zero for Ratis, > 0 for EC) + * @param command The command to send to the datanode * @param deadlineEpochMillis The time by which the replica should have been * added and reported by the datanode, or it will * be discarded. */ public void scheduleAddReplica(ContainerID containerID, - DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { - addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis); + DatanodeDetails target, int replicaIndex, SCMCommand command, long deadlineEpochMillis) { + addReplica(ADD, containerID, target, replicaIndex, command, deadlineEpochMillis); } /** @@ -131,13 +136,14 @@ public void scheduleAddReplica(ContainerID containerID, * @param containerID ContainerID for which to delete a replica * @param target The target datanode * @param replicaIndex The replica index (zero for Ratis, > 0 for EC) + * @param command The command to send to the datanode * @param deadlineEpochMillis The time by which the replica should have been * deleted and reported by the datanode, or it will * be discarded. */ public void scheduleDeleteReplica(ContainerID containerID, - DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) { - addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis); + DatanodeDetails target, int replicaIndex, SCMCommand command, long deadlineEpochMillis) { + addReplica(DELETE, containerID, target, replicaIndex, command, deadlineEpochMillis); } /** @@ -258,7 +264,7 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) { } private void addReplica(ContainerReplicaOp.PendingOpType opType, - ContainerID containerID, DatanodeDetails target, int replicaIndex, + ContainerID containerID, DatanodeDetails target, int replicaIndex, SCMCommand command, long deadlineEpochMillis) { Lock lock = writeLock(containerID); lock(lock); @@ -269,7 +275,7 @@ private void addReplica(ContainerReplicaOp.PendingOpType opType, List ops = pendingOps.computeIfAbsent( containerID, s -> new ArrayList<>()); ops.add(new ContainerReplicaOp(opType, - target, replicaIndex, deadlineEpochMillis)); + target, replicaIndex, command, deadlineEpochMillis)); incrementCounter(opType, replicaIndex); } finally { unlock(lock); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index 2f77891046d7..09a757f5774e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -621,7 +621,7 @@ private void createReplicateCommand( private void adjustPendingOps(ECContainerReplicaCount replicaCount, DatanodeDetails target, int replicaIndex) { replicaCount.addPendingOp(new ContainerReplicaOp( - ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, + ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null, Long.MAX_VALUE)); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index d183c876e956..a63c5313f7b0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -673,8 +673,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, if (cmd.getType() == Type.deleteContainerCommand) { DeleteContainerCommand rcc = (DeleteContainerCommand) cmd; containerReplicaPendingOps.scheduleDeleteReplica( - containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), - scmDeadlineEpochMs); + containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs); if (rcc.getReplicaIndex() > 0) { getMetrics().incrEcDeletionCmdsSentTotal(); } else if (rcc.getReplicaIndex() == 0) { @@ -687,8 +686,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, final ByteString targetIndexes = rcc.getMissingContainerIndexes(); for (int i = 0; i < targetIndexes.size(); i++) { containerReplicaPendingOps.scheduleAddReplica( - containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), - scmDeadlineEpochMs); + containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs); } getMetrics().incrEcReconstructionCmdsSentTotal(); } else if (cmd.getType() == Type.replicateContainerCommand) { @@ -702,7 +700,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, */ containerReplicaPendingOps.scheduleAddReplica( containerInfo.containerID(), - targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs); + targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs); } else { /* This means the source will push replica to the target, so the op's @@ -710,7 +708,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, */ containerReplicaPendingOps.scheduleAddReplica( containerInfo.containerID(), - rcc.getTargetDatanode(), rcc.getReplicaIndex(), scmDeadlineEpochMs); + rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs); } if (rcc.getReplicaIndex() > 0) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java index 0c5667d407d3..2401dc511db5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java @@ -199,13 +199,13 @@ public void testMovePendingOpsExist() throws Exception { nodes.put(src, NodeStatus.inServiceHealthy()); nodes.put(tgt, NodeStatus.inServiceHealthy()); - pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, clock.millis())); + pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, null, clock.millis())); assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_REPLICATION, containerInfo.containerID()); pendingOps.clear(); - pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, clock.millis())); + pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, null, clock.millis())); assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_DELETION, containerInfo.containerID()); } @@ -325,7 +325,7 @@ public void testDeleteCommandFails() throws Exception { .when(containerManager).getContainer(any(ContainerID.class)); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult moveResult = res.get(); @@ -337,14 +337,14 @@ public void testSuccessfulMove() throws Exception { CompletableFuture res = setupSuccessfulMove(); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); verify(replicationManager).sendDeleteCommand( eq(containerInfo), eq(0), eq(src), eq(true), anyLong()); op = new ContainerReplicaOp( - DELETE, src, 0, clock.millis() + 1000); + DELETE, src, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult finalResult = res.get(); @@ -374,7 +374,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception { anyLong()); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, srcReplica.getReplicaIndex(), clock.millis() + 1000); + ADD, tgt, srcReplica.getReplicaIndex(), null,clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); verify(replicationManager).sendDeleteCommand( @@ -382,7 +382,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception { eq(true), anyLong()); op = new ContainerReplicaOp( - DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000); + DELETE, src, srcReplica.getReplicaIndex(), null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult finalResult = res.get(); @@ -394,7 +394,7 @@ public void testMoveTimeoutOnAdd() throws Exception { CompletableFuture res = setupSuccessfulMove(); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), true); MoveManager.MoveResult finalResult = res.get(); @@ -406,14 +406,14 @@ public void testMoveTimeoutOnDelete() throws Exception { CompletableFuture res = setupSuccessfulMove(); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); verify(replicationManager).sendDeleteCommand( eq(containerInfo), eq(0), eq(src), eq(true), anyLong()); op = new ContainerReplicaOp( - DELETE, src, 0, clock.millis() + 1000); + DELETE, src, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), true); MoveManager.MoveResult finalResult = res.get(); @@ -434,7 +434,7 @@ public void testMoveCompleteSrcNoLongerPresent() throws Exception { } } ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult finalResult = res.get(); @@ -450,7 +450,7 @@ public void testMoveCompleteSrcNotHealthy() throws Exception { nodes.put(src, NodeStatus.inServiceStale()); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult finalResult = res.get(); @@ -468,7 +468,7 @@ public void testMoveCompleteSrcNotInService() throws Exception { HddsProtos.NodeOperationalState.DECOMMISSIONING, HddsProtos.NodeState.HEALTHY)); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult finalResult = res.get(); @@ -487,7 +487,7 @@ public void testMoveCompleteFutureReplicasUnhealthy() throws Exception { .MisReplicatedHealthResult(containerInfo, false, null)); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, 0, clock.millis() + 1000); + ADD, tgt, 0, null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); MoveManager.MoveResult finalResult = res.get(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java index df122682dfe1..fec80923e8dc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java @@ -23,6 +23,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -56,6 +59,8 @@ public class TestContainerReplicaPendingOps { private DatanodeDetails dn3; private ReplicationManagerMetrics metrics; private long deadline; + private SCMCommand addCmd; + private SCMCommand deleteCmd; @BeforeEach public void setup() { @@ -73,6 +78,9 @@ public void setup() { dn1 = MockDatanodeDetails.randomDatanodeDetails(); dn2 = MockDatanodeDetails.randomDatanodeDetails(); dn3 = MockDatanodeDetails.randomDatanodeDetails(); + + addCmd = ReplicateContainerCommand.toTarget(1, dn3); + deleteCmd = new DeleteContainerCommand(1, false); } @AfterEach @@ -91,8 +99,8 @@ public void testGetPendingOpsReturnsEmptyList() { @Test public void testClear() { - pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 0, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 0, deleteCmd, deadline); assertEquals(1, pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD)); assertEquals(1, pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.DELETE)); @@ -108,16 +116,16 @@ public void testClear() { @Test public void testCanAddReplicasForAdd() { - pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, deadline); - pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, addCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, deadline); // Duplicate for DN2 - pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, deadline + 1); + pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, addCmd, deadline + 1); // Not a duplicate for DN2 as different index. Should not happen in practice as it is not valid to have 2 indexes // on the same node. - pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 1, deadline); - pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, deadline); - pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, deadline + 1); + pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 1, addCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, deadline + 1); List ops = pendingOps.getPendingOps(new ContainerID(1)); @@ -149,10 +157,10 @@ public void testCanAddReplicasForAdd() { @Test public void testCanAddReplicasForDelete() { - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0, deleteCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, deadline); List ops = pendingOps.getPendingOps(new ContainerID(1)); @@ -176,11 +184,11 @@ public void testCanAddReplicasForDelete() { @Test public void testCompletingOps() { - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline); - pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, deadline); List ops = pendingOps.getPendingOps(new ContainerID(1)); @@ -209,11 +217,11 @@ public void testCompletingOps() { @Test public void testRemoveSpecificOp() { - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline); - pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline); - pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, deadline); + pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, deadline); + pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, deadline); ContainerID cid = new ContainerID(1); List ops = pendingOps.getPendingOps(cid); @@ -232,11 +240,11 @@ public void testRemoveExpiredEntries() { long expiry = clock.millis() + 1000; long laterExpiry = clock.millis() + 2000; long latestExpiry = clock.millis() + 3000; - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, expiry); - pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, expiry); - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, laterExpiry); - pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, laterExpiry); - pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, latestExpiry); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, laterExpiry); + pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, laterExpiry); + pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, latestExpiry); List ops = pendingOps.getPendingOps(new ContainerID(1)); @@ -284,12 +292,12 @@ public void testRemoveExpiredEntries() { @Test public void testReplicationMetrics() { long expiry = clock.millis() + 1000; - pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 1, expiry); - pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 2, expiry); - pendingOps.scheduleDeleteReplica(new ContainerID(2), dn2, 1, expiry); - pendingOps.scheduleAddReplica(new ContainerID(2), dn3, 1, expiry); - pendingOps.scheduleAddReplica(new ContainerID(3), dn3, 0, expiry); - pendingOps.scheduleDeleteReplica(new ContainerID(4), dn3, 0, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 1, deleteCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 2, addCmd, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(2), dn2, 1, deleteCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(2), dn3, 1, addCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(3), dn3, 0, addCmd, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(4), dn3, 0, deleteCmd, expiry); // InFlight Replication and Deletion assertEquals(3, pendingOps.getPendingOpCount(ADD)); @@ -310,12 +318,12 @@ public void testReplicationMetrics() { assertEquals(metrics.getReplicaDeleteTimeoutTotal(), 1); expiry = clock.millis() + 1000; - pendingOps.scheduleDeleteReplica(new ContainerID(3), dn1, 2, expiry); - pendingOps.scheduleAddReplica(new ContainerID(3), dn1, 3, expiry); - pendingOps.scheduleDeleteReplica(new ContainerID(4), dn2, 2, expiry); - pendingOps.scheduleAddReplica(new ContainerID(4), dn3, 4, expiry); - pendingOps.scheduleAddReplica(new ContainerID(5), dn3, 0, expiry); - pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(3), dn1, 2, deleteCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(3), dn1, 3, addCmd, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(4), dn2, 2, deleteCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(4), dn3, 4, addCmd, expiry); + pendingOps.scheduleAddReplica(new ContainerID(5), dn3, 0, addCmd, expiry); + pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, deleteCmd, expiry); // InFlight Replication and Deletion. Previous Inflight should be // removed as they were timed out. @@ -358,9 +366,9 @@ public void testNotifySubscribers() { // schedule an ADD and a DELETE ContainerID containerID = new ContainerID(1); - pendingOps.scheduleAddReplica(containerID, dn1, 0, deadline); + pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline); ContainerReplicaOp addOp = pendingOps.getPendingOps(containerID).get(0); - pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline); + pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline); // complete the ADD and verify that subscribers were notified pendingOps.completeAddReplica(containerID, dn1, 0); @@ -374,8 +382,8 @@ public void testNotifySubscribers() { verify(subscriber2, times(1)).opCompleted(deleteOp, containerID, false); // now, test notification on expiration - pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline); - pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline); + pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline); + pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline); for (ContainerReplicaOp op : pendingOps.getPendingOps(containerID)) { if (op.getOpType() == ADD) { addOp = op; @@ -397,8 +405,8 @@ public void subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() { ContainerID containerID = new ContainerID(1); // schedule ops - pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline); - pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline); + pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline); + pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline); // register subscriber ContainerReplicaPendingOpsSubscriber subscriber1 = mock( @@ -417,7 +425,7 @@ public void subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() { ContainerID containerID = new ContainerID(1); // schedule ops - pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline); + pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline); // register subscriber ContainerReplicaPendingOpsSubscriber subscriber1 = mock( @@ -425,7 +433,7 @@ public void subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() { pendingOps.registerSubscriber(subscriber1); clock.fastForward(1000); - pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline + 1); + pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1); // no entries have expired, so there should be zero interactions with the // subscriber verifyNoMoreInteractions(subscriber1); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java index ff0b838bd8b9..da7a41de2800 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java @@ -92,7 +92,7 @@ public void testContainerMissingReplica() { // appears missing ContainerReplicaOp op = new ContainerReplicaOp( ContainerReplicaOp.PendingOpType.ADD, - MockDatanodeDetails.randomDatanodeDetails(), 5, Long.MAX_VALUE); + MockDatanodeDetails.randomDatanodeDetails(), 5, null, Long.MAX_VALUE); rcnt.addPendingOp(op); assertTrue(rcnt.isSufficientlyReplicated(true)); assertEquals(0, rcnt.unavailableIndexes(true).size()); @@ -213,7 +213,7 @@ public void testOverReplicatedContainer() { // as not over replicated. rcnt.addPendingOp(new ContainerReplicaOp( ContainerReplicaOp.PendingOpType.DELETE, - MockDatanodeDetails.randomDatanodeDetails(), 2, Long.MAX_VALUE)); + MockDatanodeDetails.randomDatanodeDetails(), 2, null, Long.MAX_VALUE)); assertFalse(rcnt.isOverReplicated(true)); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index d33539445106..6c9775b881cd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -224,9 +224,9 @@ private void enableProcessAll() { @Test public void testPendingOpsClearedWhenStarting() { containerReplicaPendingOps.scheduleAddReplica(ContainerID.valueOf(1), - MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE); + MockDatanodeDetails.randomDatanodeDetails(), 1, null, Integer.MAX_VALUE); containerReplicaPendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), - MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE); + MockDatanodeDetails.randomDatanodeDetails(), 1, null, Integer.MAX_VALUE); assertEquals(1, containerReplicaPendingOps .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD)); assertEquals(1, containerReplicaPendingOps @@ -733,7 +733,7 @@ public void testUnderReplicatedContainerFixedByPending() HddsProtos.LifeCycleState.CLOSED); addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4); containerReplicaPendingOps.scheduleAddReplica(container.containerID(), - MockDatanodeDetails.randomDatanodeDetails(), 5, + MockDatanodeDetails.randomDatanodeDetails(), 5, null, clock.millis() + 10000); replicationManager.processContainer( @@ -1024,7 +1024,7 @@ public void testOverReplicatedFixByPending() addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5, 5); containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(), - MockDatanodeDetails.randomDatanodeDetails(), 5, + MockDatanodeDetails.randomDatanodeDetails(), 5, null, clock.millis() + 10000); replicationManager.processContainer( container, repQueue, repReport); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java index d54011859900..3267569684f5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java @@ -250,10 +250,10 @@ private void loadPendingOps(ContainerInfo container, Scenario scenario) { for (PendingReplica r : scenario.getPendingReplicas()) { if (r.getType() == ContainerReplicaOp.PendingOpType.ADD) { containerReplicaPendingOps.scheduleAddReplica(container.containerID(), r.getDatanodeDetails(), - r.getReplicaIndex(), Long.MAX_VALUE); + r.getReplicaIndex(), null, Long.MAX_VALUE); } else if (r.getType() == ContainerReplicaOp.PendingOpType.DELETE) { containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(), r.getDatanodeDetails(), - r.getReplicaIndex(), Long.MAX_VALUE); + r.getReplicaIndex(), null, Long.MAX_VALUE); } } } From 11bbf82d53c75e5113abf41af2bacf0ff65b7e4c Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 22 Jan 2025 17:57:30 +0000 Subject: [PATCH 3/6] Do not remove deletes from pending ops on expiry, but continue to notify subscribers --- .../ContainerReplicaPendingOps.java | 8 +++- .../TestContainerReplicaPendingOps.java | 43 +++++++++++-------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 0a2d1d57021b..999449461d67 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -227,9 +227,13 @@ public void removeExpiredEntries() { while (iterator.hasNext()) { ContainerReplicaOp op = iterator.next(); if (clock.millis() > op.getDeadlineEpochMillis()) { - iterator.remove(); + if (op.getOpType() != DELETE) { + // For delete ops, we don't remove them from the list as RM must resend them, or they + // will be removed via a container report when they are confirmed as deleted. + iterator.remove(); + decrementCounter(op.getOpType(), op.getReplicaIndex()); + } expiredOps.add(op); - decrementCounter(op.getOpType(), op.getReplicaIndex()); updateTimeoutMetrics(op); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java index fec80923e8dc..0326a95f6c5c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java @@ -245,12 +245,13 @@ public void testRemoveExpiredEntries() { pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, laterExpiry); pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, laterExpiry); pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, latestExpiry); + pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, latestExpiry); List ops = pendingOps.getPendingOps(new ContainerID(1)); assertEquals(4, ops.size()); ops = pendingOps.getPendingOps(new ContainerID(2)); - assertEquals(1, ops.size()); + assertEquals(2, ops.size()); // Some entries expire at "start + 1000" some at start + 2000 and // start + 3000. Clock is currently at "start" @@ -262,31 +263,39 @@ public void testRemoveExpiredEntries() { clock.fastForward(1000); pendingOps.removeExpiredEntries(); - // Those with deadline + 1000 should be removed. + // Those ADD with deadline + 1000 should be removed, but deletes are retained ops = pendingOps.getPendingOps(new ContainerID(1)); - assertEquals(2, ops.size()); + assertEquals(3, ops.size()); // We should lose the entries for DN1 - List dns = ops.stream() - .map(ContainerReplicaOp::getTarget) - .collect(Collectors.toList()); - assertThat(dns).doesNotContain(dn1); - assertThat(dns).contains(dn2); - assertThat(dns).contains(dn3); + assertFalse(isOpPresent(ops, dn1, 0, ADD)); + assertTrue(isOpPresent(ops, dn1, 0, DELETE)); + assertTrue(isOpPresent(ops, dn2, 0, DELETE)); clock.fastForward(1000); pendingOps.removeExpiredEntries(); - // Now should only have entries for container 2 + // Now should only have entries for container 2 and the deletes for container 1 ops = pendingOps.getPendingOps(new ContainerID(1)); - assertEquals(0, ops.size()); + assertEquals(2, ops.size()); + + assertTrue(isOpPresent(ops, dn1, 0, DELETE)); + assertTrue(isOpPresent(ops, dn2, 0, DELETE)); + ops = pendingOps.getPendingOps(new ContainerID(2)); - assertEquals(1, ops.size()); + assertEquals(2, ops.size()); - // Advance the clock again and all should be removed + // Advance the clock again and all should be removed except deletes clock.fastForward(1000); pendingOps.removeExpiredEntries(); ops = pendingOps.getPendingOps(new ContainerID(2)); - assertEquals(0, ops.size()); + assertTrue(isOpPresent(ops, dn1, 1, DELETE)); + assertEquals(1, ops.size()); + } + + private boolean isOpPresent(List ops, DatanodeDetails dn, + int index, ContainerReplicaOp.PendingOpType type) { + return ops.stream().anyMatch(op -> op.getTarget().equals(dn) && + op.getReplicaIndex() == index && op.getOpType() == type); } @Test @@ -326,9 +335,9 @@ public void testReplicationMetrics() { pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, deleteCmd, expiry); // InFlight Replication and Deletion. Previous Inflight should be - // removed as they were timed out. + // removed as they were timed out, but deletes are retained assertEquals(3, pendingOps.getPendingOpCount(ADD)); - assertEquals(3, pendingOps.getPendingOpCount(DELETE)); + assertEquals(6, pendingOps.getPendingOpCount(DELETE)); pendingOps.completeDeleteReplica(new ContainerID(3), dn1, 2); pendingOps.completeAddReplica(new ContainerID(3), dn1, 3); @@ -347,7 +356,7 @@ public void testReplicationMetrics() { // Checking pendingOpCount doesn't go below zero assertEquals(0, pendingOps.getPendingOpCount(ADD)); - assertEquals(0, pendingOps.getPendingOpCount(DELETE)); + assertEquals(3, pendingOps.getPendingOpCount(DELETE)); } /** From 9c1e0c09b85a18f135c8b54cea0cfd731a5e5c6c Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Thu, 23 Jan 2025 17:23:43 +0000 Subject: [PATCH 4/6] Add completeOp method / interface to RM --- .../ContainerReplicaPendingOps.java | 3 -- .../replication/ReplicationManager.java | 23 +++++++++++++- .../container/balancer/TestMoveManager.java | 2 +- .../replication/TestReplicationManager.java | 30 +++++++++++++++++++ 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 999449461d67..ad2f8b6bf4a2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -21,10 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.jheaps.annotations.VisibleForTesting; import java.time.Clock; import java.util.ArrayList; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index a63c5313f7b0..f1775d170d80 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -103,7 +103,7 @@ * that the containers are properly replicated. Replication Manager deals only * with Quasi Closed / Closed container. */ -public class ReplicationManager implements SCMService { +public class ReplicationManager implements SCMService, ContainerReplicaPendingOpsSubscriber { public static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class); @@ -1041,6 +1041,27 @@ ReplicationQueue getQueue() { return replicationQueue.get(); } + @Override + public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) { + if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) { + // We only care about expired delete ops. All others should be ignored. + return; + } + try { + ContainerInfo containerInfo = containerManager.getContainer(containerID); + // Sending the command in this way is un-throttled, and the command will have its deadline + // adjusted to a new deadline as part of the sending process. + sendDatanodeCommand(op.getCommand(), containerInfo, op.getTarget()); + } catch (ContainerNotFoundException e) { + // Should not happen, as even deleted containers are currently retained in the SCM container map + LOG.error("Container {} not found when processing expired delete", containerID, e); + } catch (NotLeaderException e) { + // If SCM leadership has changed, this is fine to ignore. All pending ops will be expired + // once SCM leadership switches. + LOG.warn("SCM is not leader when processing expired delete", e); + } + } + /** * Configuration used by the Replication Manager. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java index 2401dc511db5..8cc62ab03cb1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java @@ -374,7 +374,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception { anyLong()); ContainerReplicaOp op = new ContainerReplicaOp( - ADD, tgt, srcReplica.getReplicaIndex(), null,clock.millis() + 1000); + ADD, tgt, srcReplica.getReplicaIndex(), null, clock.millis() + 1000); moveManager.opCompleted(op, containerInfo.containerID(), false); verify(replicationManager).sendDeleteCommand( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 6c9775b881cd..34da3a077fb0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -92,6 +92,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1739,6 +1740,35 @@ public void testInflightReplicationLimit() throws IOException { rm.getReplicationInFlightLimit()); } + @Test + public void testPendingOpExpiry() throws ContainerNotFoundException { + when(containerManager.getContainer(any())) + .thenReturn(ReplicationTestUtil.createContainerInfo(repConfig, 1, + HddsProtos.LifeCycleState.CLOSED, 10, 20)); + // This is just some arbitrary epoch time in the past + long commandDeadline = 1000; + SCMCommand command = new DeleteContainerCommand(1L, true); + + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails(); + + ContainerReplicaOp addOp = ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, dn1, 1); + ContainerReplicaOp delOp = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.DELETE, dn2, 1, command, commandDeadline); + + replicationManager.opCompleted(addOp, new ContainerID(1L), false); + replicationManager.opCompleted(delOp, new ContainerID(1L), false); + // No commands should be sent for either of the above ops. + assertEquals(0, commandsSent.size()); + + replicationManager.opCompleted(delOp, new ContainerID(1L), true); + assertEquals(1, commandsSent.size()); + Pair> sentCommand = commandsSent.iterator().next(); + // The target should be DN2 and the deadline should have been updated from the value set in commandDeadline above + assertEquals(dn2.getUuid(), sentCommand.getLeft()); + assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline()); + } + @SafeVarargs private final Set addReplicas(ContainerInfo container, ContainerReplicaProto.State replicaState, From c270e834194d778273e11c9ef73ba194beea13d2 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 24 Jan 2025 12:24:06 +0000 Subject: [PATCH 5/6] Subscribe RM to notifications from ContainerReplicaPendingOps --- .../apache/hadoop/hdds/scm/server/StorageContainerManager.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 52148c3d6835..75cb53ebc046 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -837,6 +837,9 @@ private void initializeSystemManagers(OzoneConfiguration conf, reconfigurationHandler.register(replicationManager.getConfig()); } serviceManager.register(replicationManager); + // RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it + // so it can resend them. + containerReplicaPendingOps.registerSubscriber(replicationManager); if (configurator.getScmSafeModeManager() != null) { scmSafeModeManager = configurator.getScmSafeModeManager(); } else { From 49e3f7a4c2d6ec7cf8d9419e128c38b4939f26ff Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 24 Jan 2025 16:16:45 +0000 Subject: [PATCH 6/6] Fix failing test by capturing logs earlier --- .../common/statemachine/commandhandler/TestBlockDeletion.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index df5f3ec0d270..732c75ce689d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -404,6 +404,8 @@ public void testContainerStatisticsAfterDelete() throws Exception { }); }); + LogCapturer logCapturer = LogCapturer.captureLogs(ReplicationManager.LOG); + logCapturer.clearOutput(); cluster.shutdownHddsDatanode(0); replicationManager.processAll(); ((EventQueue)scm.getEventQueue()).processAll(1000); @@ -411,8 +413,6 @@ public void testContainerStatisticsAfterDelete() throws Exception { containerInfos.stream().forEach(container -> assertEquals(HddsProtos.LifeCycleState.DELETING, container.getState())); - LogCapturer logCapturer = LogCapturer.captureLogs(ReplicationManager.LOG); - logCapturer.clearOutput(); Thread.sleep(5000); replicationManager.processAll();