Skip to content

Commit f2987f7

Browse files
sodonnelCyrill
authored andcommitted
HDDS-12566. Handle Over replication of Quasi Closed Stuck containers (apache#8061)
1 parent 63d7c4f commit f2987f7

File tree

6 files changed

+1302
-8
lines changed

6 files changed

+1302
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.container.replication;
19+
20+
import java.io.IOException;
21+
import java.util.Comparator;
22+
import java.util.List;
23+
import java.util.Set;
24+
import java.util.stream.Collectors;
25+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
26+
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
27+
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
28+
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
29+
30+
/**
31+
* Class to correct over replicated QuasiClosed Stuck Ratis containers.
32+
*/
33+
public class QuasiClosedStuckOverReplicationHandler implements UnhealthyReplicationHandler {
34+
35+
private static final org.slf4j.Logger LOG =
36+
org.slf4j.LoggerFactory.getLogger(QuasiClosedStuckOverReplicationHandler.class);
37+
private final ReplicationManager replicationManager;
38+
private final ReplicationManagerMetrics metrics;
39+
40+
public QuasiClosedStuckOverReplicationHandler(final ReplicationManager replicationManager) {
41+
this.replicationManager = replicationManager;
42+
this.metrics = replicationManager.getMetrics();
43+
}
44+
45+
@Override
46+
public int processAndSendCommands(Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
47+
ContainerHealthResult result, int remainingMaintenanceRedundancy)
48+
throws IOException {
49+
50+
ContainerInfo containerInfo = result.getContainerInfo();
51+
LOG.debug("Handling over replicated QuasiClosed Stuck Ratis container {}", containerInfo);
52+
53+
int pendingDelete = 0;
54+
for (ContainerReplicaOp op : pendingOps) {
55+
if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
56+
pendingDelete++;
57+
}
58+
}
59+
60+
if (pendingDelete > 0) {
61+
LOG.debug("Container {} has pending delete operations. No more over replication will be scheduled until they " +
62+
"complete", containerInfo);
63+
return 0;
64+
}
65+
66+
// Filter out any STALE replicas, as they may go dead soon. If so, we don't want to remove other healthy replicas
67+
// instead of them, as they could result in under replication.
68+
Set<ContainerReplica> healthyReplicas = replicas.stream()
69+
.filter(replica -> {
70+
try {
71+
return replicationManager.getNodeStatus(
72+
replica.getDatanodeDetails()).getHealth() == HddsProtos.NodeState.HEALTHY;
73+
} catch (NodeNotFoundException e) {
74+
return false;
75+
}
76+
})
77+
.collect(Collectors.toSet());
78+
79+
QuasiClosedStuckReplicaCount replicaCount =
80+
new QuasiClosedStuckReplicaCount(healthyReplicas, remainingMaintenanceRedundancy);
81+
82+
List<QuasiClosedStuckReplicaCount.MisReplicatedOrigin> misReplicatedOrigins
83+
= replicaCount.getOverReplicatedOrigins();
84+
85+
if (misReplicatedOrigins.isEmpty()) {
86+
LOG.debug("Container {} is not over replicated", containerInfo);
87+
return 0;
88+
}
89+
90+
int totalCommandsSent = 0;
91+
IOException firstException = null;
92+
for (QuasiClosedStuckReplicaCount.MisReplicatedOrigin origin : misReplicatedOrigins) {
93+
List<ContainerReplica> sortedReplicas = getSortedReplicas(origin.getSources());
94+
for (int i = 0; i < origin.getReplicaDelta(); i++) {
95+
try {
96+
replicationManager.sendThrottledDeleteCommand(
97+
containerInfo, 0, sortedReplicas.get(i).getDatanodeDetails(), true);
98+
totalCommandsSent++;
99+
} catch (CommandTargetOverloadedException e) {
100+
LOG.debug("Unable to send delete command for container {} to {} as it has too many pending delete commands",
101+
containerInfo, sortedReplicas.get(i).getDatanodeDetails());
102+
firstException = e;
103+
}
104+
}
105+
}
106+
107+
if (firstException != null) {
108+
// Some nodes were overloaded when attempting to send commands.
109+
if (totalCommandsSent > 0) {
110+
metrics.incrPartialReplicationTotal();
111+
}
112+
throw firstException;
113+
}
114+
return totalCommandsSent;
115+
}
116+
117+
private List<ContainerReplica> getSortedReplicas(
118+
Set<ContainerReplica> replicas) {
119+
// sort replicas so that they can be selected in a deterministic way
120+
return replicas.stream()
121+
.sorted(Comparator.comparingLong(ContainerReplica::hashCode))
122+
.collect(Collectors.toList());
123+
}
124+
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public class ReplicationManager implements SCMService {
196196
private final RatisOverReplicationHandler ratisOverReplicationHandler;
197197
private final RatisMisReplicationHandler ratisMisReplicationHandler;
198198
private final QuasiClosedStuckUnderReplicationHandler quasiClosedStuckUnderReplicationHandler;
199+
private final QuasiClosedStuckOverReplicationHandler quasiClosedStuckOverReplicationHandler;
199200
private Thread underReplicatedProcessorThread;
200201
private Thread overReplicatedProcessorThread;
201202
private final UnderReplicatedProcessor underReplicatedProcessor;
@@ -267,6 +268,7 @@ public ReplicationManager(final ConfigurationSource conf,
267268
ratisContainerPlacement, conf, this);
268269
quasiClosedStuckUnderReplicationHandler =
269270
new QuasiClosedStuckUnderReplicationHandler(ratisContainerPlacement, conf, this);
271+
quasiClosedStuckOverReplicationHandler = new QuasiClosedStuckOverReplicationHandler(this);
270272
underReplicatedProcessor =
271273
new UnderReplicatedProcessor(this, rmConf::getUnderReplicatedInterval);
272274
overReplicatedProcessor =
@@ -805,8 +807,16 @@ int processOverReplicatedContainer(
805807
containerReplicaPendingOps.getPendingOps(containerID);
806808

807809
final boolean isEC = isEC(result.getContainerInfo().getReplicationConfig());
808-
final UnhealthyReplicationHandler handler = isEC ? ecOverReplicationHandler
809-
: ratisOverReplicationHandler;
810+
UnhealthyReplicationHandler handler;
811+
if (isEC) {
812+
handler = ecOverReplicationHandler;
813+
} else {
814+
if (QuasiClosedStuckReplicationCheck.shouldHandleAsQuasiClosedStuck(result.getContainerInfo(), replicas)) {
815+
handler = quasiClosedStuckOverReplicationHandler;
816+
} else {
817+
handler = ratisOverReplicationHandler;
818+
}
819+
}
810820

811821
return handler.processAndSendCommands(replicas,
812822
pendingOps, result, getRemainingMaintenanceRedundancy(isEC));

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,9 +499,29 @@ public static void mockRMSendDeleteCommand(ReplicationManager mock,
499499
* @param commandsSent Set to add the command to rather than sending it.
500500
*/
501501
public static void mockRMSendThrottledDeleteCommand(ReplicationManager mock,
502-
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
502+
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
503+
throws NotLeaderException, CommandTargetOverloadedException {
504+
mockRMSendThrottledDeleteCommand(mock, commandsSent, new AtomicBoolean(false));
505+
}
506+
507+
/**
508+
* Given a Mockito mock of ReplicationManager, this method will mock the
509+
* sendThrottledDeleteCommand method so that it adds the command created to
510+
* the commandsSent set.
511+
* @param mock Mock of ReplicationManager
512+
* @param commandsSent Set to add the command to rather than sending it.
513+
* @param throwOverloaded If the atomic boolean is true, throw a
514+
* CommandTargetOverloadedException and set the boolean
515+
* to false, instead of creating the replicate command.
516+
*/
517+
public static void mockRMSendThrottledDeleteCommand(ReplicationManager mock,
518+
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent, AtomicBoolean throwOverloaded)
503519
throws NotLeaderException, CommandTargetOverloadedException {
504520
doAnswer((Answer<Void>) invocationOnMock -> {
521+
if (throwOverloaded.get()) {
522+
throwOverloaded.set(false);
523+
throw new CommandTargetOverloadedException("Overloaded");
524+
}
505525
ContainerInfo containerInfo = invocationOnMock.getArgument(0);
506526
int replicaIndex = invocationOnMock.getArgument(1);
507527
DatanodeDetails target = invocationOnMock.getArgument(2);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.container.replication;
19+
20+
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.when;
26+
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.HashSet;
31+
import java.util.List;
32+
import java.util.Set;
33+
import java.util.UUID;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import org.apache.commons.lang3.tuple.Pair;
36+
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
37+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
38+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
39+
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
40+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
41+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
42+
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
43+
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
44+
import org.apache.hadoop.hdds.scm.node.NodeStatus;
45+
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
46+
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
47+
import org.apache.ratis.protocol.exceptions.NotLeaderException;
48+
import org.junit.jupiter.api.BeforeEach;
49+
import org.junit.jupiter.api.Test;
50+
51+
/**
52+
* Test for QuasiClosedStuckOverReplicationHandler.
53+
*/
54+
public class TestQuasiClosedStuckOverReplicationHandler {
55+
56+
private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = RatisReplicationConfig.getInstance(THREE);
57+
private ContainerInfo container;
58+
private ReplicationManager replicationManager;
59+
private ReplicationManagerMetrics metrics;
60+
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
61+
private QuasiClosedStuckOverReplicationHandler handler;
62+
private UUID origin1 = UUID.randomUUID();
63+
private UUID origin2 = UUID.randomUUID();
64+
65+
@BeforeEach
66+
void setup() throws NodeNotFoundException,
67+
CommandTargetOverloadedException, NotLeaderException {
68+
container = ReplicationTestUtil.createContainer(
69+
HddsProtos.LifeCycleState.QUASI_CLOSED, RATIS_REPLICATION_CONFIG);
70+
71+
replicationManager = mock(ReplicationManager.class);
72+
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
73+
ozoneConfiguration.setBoolean("hdds.scm.replication.push", true);
74+
when(replicationManager.getConfig())
75+
.thenReturn(ozoneConfiguration.getObject(
76+
ReplicationManager.ReplicationManagerConfiguration.class));
77+
metrics = ReplicationManagerMetrics.create(replicationManager);
78+
when(replicationManager.getMetrics()).thenReturn(metrics);
79+
80+
/*
81+
Return NodeStatus with NodeOperationalState as specified in
82+
DatanodeDetails, and NodeState as HEALTHY.
83+
*/
84+
when(
85+
replicationManager.getNodeStatus(any(DatanodeDetails.class)))
86+
.thenAnswer(invocationOnMock -> {
87+
DatanodeDetails dn = invocationOnMock.getArgument(0);
88+
return new NodeStatus(dn.getPersistedOpState(),
89+
HddsProtos.NodeState.HEALTHY);
90+
});
91+
92+
commandsSent = new HashSet<>();
93+
ReplicationTestUtil.mockRMSendThrottledDeleteCommand(
94+
replicationManager, commandsSent);
95+
handler = new QuasiClosedStuckOverReplicationHandler(replicationManager);
96+
}
97+
98+
@Test
99+
public void testReturnsZeroIfNotOverReplicated() throws IOException {
100+
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
101+
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
102+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
103+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
104+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
105+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));
106+
107+
int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1);
108+
assertEquals(0, count);
109+
}
110+
111+
@Test
112+
public void testNoCommandsScheduledIfPendingOps() throws IOException {
113+
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
114+
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
115+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
116+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
117+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
118+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
119+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
120+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));
121+
List<ContainerReplicaOp> pendingOps = new ArrayList<>();
122+
pendingOps.add(ContainerReplicaOp.create(
123+
ContainerReplicaOp.PendingOpType.DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
124+
125+
int count = handler.processAndSendCommands(replicas, pendingOps, getOverReplicatedHealthResult(), 1);
126+
assertEquals(0, count);
127+
}
128+
129+
@Test
130+
public void testCommandScheduledForOverReplicatedContainer() throws IOException {
131+
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
132+
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
133+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
134+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
135+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
136+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
137+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));
138+
139+
int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1);
140+
assertEquals(1, count);
141+
SCMCommand<?> command = commandsSent.iterator().next().getRight();
142+
assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand, command.getType());
143+
}
144+
145+
@Test
146+
public void testOverloadedExceptionContinuesAndThrows() throws NotLeaderException, CommandTargetOverloadedException {
147+
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
148+
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
149+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
150+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
151+
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
152+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
153+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
154+
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));
155+
156+
ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager, commandsSent, new AtomicBoolean(true));
157+
158+
assertThrows(CommandTargetOverloadedException.class, () ->
159+
handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1));
160+
assertEquals(1, commandsSent.size());
161+
}
162+
163+
164+
private ContainerHealthResult.OverReplicatedHealthResult getOverReplicatedHealthResult() {
165+
ContainerHealthResult.OverReplicatedHealthResult
166+
healthResult = mock(ContainerHealthResult.OverReplicatedHealthResult.class);
167+
when(healthResult.getContainerInfo()).thenReturn(container);
168+
return healthResult;
169+
}
170+
171+
}

0 commit comments

Comments
 (0)