Skip to content

Commit

Permalink
HDDS-12127. RM should not expire pending deletes, but retry until del…
Browse files Browse the repository at this point in the history
…ete is confirmed or node is dead (#7746)
  • Loading branch information
sodonnel authored Jan 28, 2025
1 parent f1b59f1 commit 04f6255
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

/**
* Class to wrap details used to track pending replications.
Expand All @@ -34,19 +35,20 @@ public enum PendingOpType {
private final PendingOpType opType;
private final DatanodeDetails target;
private final int replicaIndex;
private final SCMCommand<?> command;
private final long deadlineEpochMillis;

public static ContainerReplicaOp create(PendingOpType opType,
DatanodeDetails target, int replicaIndex) {
return new ContainerReplicaOp(opType, target, replicaIndex,
System.currentTimeMillis());
return new ContainerReplicaOp(opType, target, replicaIndex, null, System.currentTimeMillis());
}

public ContainerReplicaOp(PendingOpType opType,
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
this.opType = opType;
this.target = target;
this.replicaIndex = replicaIndex;
this.command = command;
this.deadlineEpochMillis = deadlineEpochMillis;
}

Expand All @@ -62,6 +64,10 @@ public int getReplicaIndex() {
return replicaIndex;
}

public SCMCommand<?> getCommand() {
return command;
}

public long getDeadlineEpochMillis() {
return deadlineEpochMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

import java.time.Clock;
import java.util.ArrayList;
Expand Down Expand Up @@ -117,27 +118,29 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
* @param containerID ContainerID for which to add a replica
* @param target The target datanode
* @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
* @param command The command to send to the datanode
* @param deadlineEpochMillis The time by which the replica should have been
* added and reported by the datanode, or it will
* be discarded.
*/
public void scheduleAddReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis);
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
addReplica(ADD, containerID, target, replicaIndex, command, deadlineEpochMillis);
}

/**
* Store a ContainerReplicaOp to delete a replica for the given ContainerID.
* @param containerID ContainerID for which to delete a replica
* @param target The target datanode
* @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
* @param command The command to send to the datanode
* @param deadlineEpochMillis The time by which the replica should have been
* deleted and reported by the datanode, or it will
* be discarded.
*/
public void scheduleDeleteReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis);
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long deadlineEpochMillis) {
addReplica(DELETE, containerID, target, replicaIndex, command, deadlineEpochMillis);
}

/**
Expand All @@ -150,7 +153,7 @@ public void scheduleDeleteReplica(ContainerID containerID,
*/
public boolean completeAddReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
boolean completed = completeOp(ADD, containerID, target, replicaIndex);
boolean completed = completeOp(ADD, containerID, target, replicaIndex, true);
if (isMetricsNotNull() && completed) {
if (isEC(replicaIndex)) {
replicationMetrics.incrEcReplicasCreatedTotal();
Expand All @@ -172,7 +175,7 @@ public boolean completeAddReplica(ContainerID containerID,
*/
public boolean completeDeleteReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
boolean completed = completeOp(DELETE, containerID, target, replicaIndex);
boolean completed = completeOp(DELETE, containerID, target, replicaIndex, true);
if (isMetricsNotNull() && completed) {
if (isEC(replicaIndex)) {
replicationMetrics.incrEcReplicasDeletedTotal();
Expand All @@ -192,7 +195,7 @@ public boolean completeDeleteReplica(ContainerID containerID,
public boolean removeOp(ContainerID containerID,
ContainerReplicaOp op) {
return completeOp(op.getOpType(), containerID, op.getTarget(),
op.getReplicaIndex());
op.getReplicaIndex(), true);
}

/**
Expand Down Expand Up @@ -221,9 +224,13 @@ public void removeExpiredEntries() {
while (iterator.hasNext()) {
ContainerReplicaOp op = iterator.next();
if (clock.millis() > op.getDeadlineEpochMillis()) {
iterator.remove();
if (op.getOpType() != DELETE) {
// For delete ops, we don't remove them from the list as RM must resend them, or they
// will be removed via a container report when they are confirmed as deleted.
iterator.remove();
decrementCounter(op.getOpType(), op.getReplicaIndex());
}
expiredOps.add(op);
decrementCounter(op.getOpType(), op.getReplicaIndex());
updateTimeoutMetrics(op);
}
}
Expand Down Expand Up @@ -258,23 +265,26 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
}

private void addReplica(ContainerReplicaOp.PendingOpType opType,
ContainerID containerID, DatanodeDetails target, int replicaIndex,
ContainerID containerID, DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
long deadlineEpochMillis) {
Lock lock = writeLock(containerID);
lock(lock);
try {
// Remove any existing duplicate op for the same target and replicaIndex before adding
// the new one. Especially for delete ops, they could be getting resent after expiry.
completeOp(opType, containerID, target, replicaIndex, false);
List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
containerID, s -> new ArrayList<>());
ops.add(new ContainerReplicaOp(opType,
target, replicaIndex, deadlineEpochMillis));
target, replicaIndex, command, deadlineEpochMillis));
incrementCounter(opType, replicaIndex);
} finally {
unlock(lock);
}
}

private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
ContainerID containerID, DatanodeDetails target, int replicaIndex) {
ContainerID containerID, DatanodeDetails target, int replicaIndex, boolean notifySubsribers) {
boolean found = false;
// List of completed ops that subscribers will be notified about
List<ContainerReplicaOp> completedOps = new ArrayList<>();
Expand Down Expand Up @@ -303,7 +313,7 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
unlock(lock);
}

if (found) {
if (found && notifySubsribers) {
notifySubscribers(completedOps, containerID, false);
}
return found;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ private void createReplicateCommand(
private void adjustPendingOps(ECContainerReplicaCount replicaCount,
DatanodeDetails target, int replicaIndex) {
replicaCount.addPendingOp(new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null,
Long.MAX_VALUE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
public class ReplicationManager implements SCMService {
public class ReplicationManager implements SCMService, ContainerReplicaPendingOpsSubscriber {

public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
Expand Down Expand Up @@ -673,8 +673,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
if (cmd.getType() == Type.deleteContainerCommand) {
DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
containerReplicaPendingOps.scheduleDeleteReplica(
containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(),
scmDeadlineEpochMs);
containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
if (rcc.getReplicaIndex() > 0) {
getMetrics().incrEcDeletionCmdsSentTotal();
} else if (rcc.getReplicaIndex() == 0) {
Expand All @@ -687,8 +686,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
final ByteString targetIndexes = rcc.getMissingContainerIndexes();
for (int i = 0; i < targetIndexes.size(); i++) {
containerReplicaPendingOps.scheduleAddReplica(
containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i),
scmDeadlineEpochMs);
containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs);
}
getMetrics().incrEcReconstructionCmdsSentTotal();
} else if (cmd.getType() == Type.replicateContainerCommand) {
Expand All @@ -702,15 +700,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
*/
containerReplicaPendingOps.scheduleAddReplica(
containerInfo.containerID(),
targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs);
targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
} else {
/*
This means the source will push replica to the target, so the op's
target Datanode should be the Datanode the replica will be pushed to.
*/
containerReplicaPendingOps.scheduleAddReplica(
containerInfo.containerID(),
rcc.getTargetDatanode(), rcc.getReplicaIndex(), scmDeadlineEpochMs);
rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
}

if (rcc.getReplicaIndex() > 0) {
Expand Down Expand Up @@ -1043,6 +1041,27 @@ ReplicationQueue getQueue() {
return replicationQueue.get();
}

@Override
public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) {
if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) {
// We only care about expired delete ops. All others should be ignored.
return;
}
try {
ContainerInfo containerInfo = containerManager.getContainer(containerID);
// Sending the command in this way is un-throttled, and the command will have its deadline
// adjusted to a new deadline as part of the sending process.
sendDatanodeCommand(op.getCommand(), containerInfo, op.getTarget());
} catch (ContainerNotFoundException e) {
// Should not happen, as even deleted containers are currently retained in the SCM container map
LOG.error("Container {} not found when processing expired delete", containerID, e);
} catch (NotLeaderException e) {
// If SCM leadership has changed, this is fine to ignore. All pending ops will be expired
// once SCM leadership switches.
LOG.warn("SCM is not leader when processing expired delete", e);
}
}

/**
* Configuration used by the Replication Manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,9 @@ private void initializeSystemManagers(OzoneConfiguration conf,
reconfigurationHandler.register(replicationManager.getConfig());
}
serviceManager.register(replicationManager);
// RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it
// so it can resend them.
containerReplicaPendingOps.registerSubscriber(replicationManager);
if (configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ public void testMovePendingOpsExist() throws Exception {
nodes.put(src, NodeStatus.inServiceHealthy());
nodes.put(tgt, NodeStatus.inServiceHealthy());

pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, clock.millis()));
pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, null, clock.millis()));

assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_REPLICATION,
containerInfo.containerID());

pendingOps.clear();
pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, clock.millis()));
pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, null, clock.millis()));
assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_DELETION,
containerInfo.containerID());
}
Expand Down Expand Up @@ -325,7 +325,7 @@ public void testDeleteCommandFails() throws Exception {
.when(containerManager).getContainer(any(ContainerID.class));

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult moveResult = res.get();
Expand All @@ -337,14 +337,14 @@ public void testSuccessfulMove() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(0), eq(src), eq(true), anyLong());

op = new ContainerReplicaOp(
DELETE, src, 0, clock.millis() + 1000);
DELETE, src, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult finalResult = res.get();
Expand Down Expand Up @@ -374,15 +374,15 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception {
anyLong());

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, srcReplica.getReplicaIndex(), clock.millis() + 1000);
ADD, tgt, srcReplica.getReplicaIndex(), null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(srcReplica.getReplicaIndex()), eq(src),
eq(true), anyLong());

op = new ContainerReplicaOp(
DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000);
DELETE, src, srcReplica.getReplicaIndex(), null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult finalResult = res.get();
Expand All @@ -394,7 +394,7 @@ public void testMoveTimeoutOnAdd() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), true);

MoveManager.MoveResult finalResult = res.get();
Expand All @@ -406,14 +406,14 @@ public void testMoveTimeoutOnDelete() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(0), eq(src), eq(true), anyLong());

op = new ContainerReplicaOp(
DELETE, src, 0, clock.millis() + 1000);
DELETE, src, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), true);

MoveManager.MoveResult finalResult = res.get();
Expand All @@ -434,7 +434,7 @@ public void testMoveCompleteSrcNoLongerPresent() throws Exception {
}
}
ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult finalResult = res.get();
Expand All @@ -450,7 +450,7 @@ public void testMoveCompleteSrcNotHealthy() throws Exception {

nodes.put(src, NodeStatus.inServiceStale());
ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult finalResult = res.get();
Expand All @@ -468,7 +468,7 @@ public void testMoveCompleteSrcNotInService() throws Exception {
HddsProtos.NodeOperationalState.DECOMMISSIONING,
HddsProtos.NodeState.HEALTHY));
ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult finalResult = res.get();
Expand All @@ -487,7 +487,7 @@ public void testMoveCompleteFutureReplicasUnhealthy() throws Exception {
.MisReplicatedHealthResult(containerInfo, false, null));

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult finalResult = res.get();
Expand Down
Loading

0 comments on commit 04f6255

Please sign in to comment.