Skip to content

Commit d21f3fe

Browse files
committed
support disable_read_repair_mutation and disable_block_on_read_repair
1 parent e332e77 commit d21f3fe

11 files changed

+226
-5
lines changed

src/java/org/apache/cassandra/config/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ public class Config
315315
public int otc_coalescing_enough_coalesced_messages = 8;
316316

317317
public volatile boolean coerce_read_consistency_all = false;
318+
public volatile boolean disable_read_repair_mutation = false;
319+
public volatile boolean disable_block_on_read_repair = false;
318320

319321
public int windows_timer_interval = 0;
320322

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,4 +2106,24 @@ public static void setCoerceReadConsistencyAll(boolean value)
21062106
{
21072107
conf.coerce_read_consistency_all = value;
21082108
}
2109+
2110+
public static boolean getDisableReadRepairMutation()
2111+
{
2112+
return conf.disable_read_repair_mutation;
2113+
}
2114+
2115+
public static void setDisableReadRepairMutation(boolean value)
2116+
{
2117+
conf.disable_read_repair_mutation = value;
2118+
}
2119+
2120+
public static boolean getDisableBlockOnReadRepair()
2121+
{
2122+
return conf.disable_block_on_read_repair;
2123+
}
2124+
2125+
public static void setDisableBlockOnReadRepair(boolean value)
2126+
{
2127+
conf.disable_block_on_read_repair = value;
2128+
}
21092129
}

src/java/org/apache/cassandra/service/RowDataResolver.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import com.google.common.collect.Iterables;
2828

29+
import org.apache.cassandra.config.DatabaseDescriptor;
2930
import org.apache.cassandra.db.*;
3031
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
3132
import org.apache.cassandra.db.filter.IDiskAtomFilter;
@@ -109,6 +110,11 @@ public Row resolve() throws DigestMismatchException
109110
*/
110111
public static List<AsyncOneResponse> scheduleRepairs(ColumnFamily resolved, String keyspaceName, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
111112
{
113+
if (DatabaseDescriptor.getDisableReadRepairMutation())
114+
{
115+
return Collections.emptyList();
116+
}
117+
112118
List<AsyncOneResponse> results = new ArrayList<AsyncOneResponse>(versions.size());
113119

114120
for (int i = 0; i < versions.size(); i++)

src/java/org/apache/cassandra/service/StorageProxy.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,9 +1556,12 @@ else if (logger.isDebugEnabled())
15561556
RowDataResolver resolver = (RowDataResolver)handler.resolver;
15571557
try
15581558
{
1559-
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
1560-
// behind on writes in case the out-of-sync row is read multiple times in quick succession
1561-
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
1559+
if (!DatabaseDescriptor.getDisableBlockOnReadRepair())
1560+
{
1561+
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
1562+
// behind on writes in case the out-of-sync row is read multiple times in quick succession
1563+
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
1564+
}
15621565
}
15631566
catch (TimeoutException e)
15641567
{

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5528,14 +5528,36 @@ public void setWriteDelay(int writeDelay) {
55285528
logger.info("Updated write_delay_in_s to {}", SafeArg.of("value", writeDelay));
55295529
}
55305530

5531+
public boolean getCoerceReadConsistencyAll()
5532+
{
5533+
return DatabaseDescriptor.getCoerceReadConsistencyAll();
5534+
}
5535+
55315536
public void setCoerceReadConsistencyAll(boolean value)
55325537
{
55335538
DatabaseDescriptor.setCoerceReadConsistencyAll(value);
55345539
logger.info("Updated coerce_read_consistency_level to {}", SafeArg.of("value", value));
55355540
}
55365541

5537-
public boolean getCoerceReadConsistencyAll()
5542+
public boolean getDisableReadRepairMutation()
55385543
{
5539-
return DatabaseDescriptor.getCoerceReadConsistencyAll();
5544+
return DatabaseDescriptor.getDisableReadRepairMutation();
5545+
}
5546+
5547+
public void setDisableReadRepairMutation(boolean value)
5548+
{
5549+
DatabaseDescriptor.setDisableReadRepairMutation(value);
5550+
logger.info("Updated disable_read_repair_mutation to {}", SafeArg.of("value", value));
5551+
}
5552+
5553+
public boolean getDisableBlockOnReadRepair()
5554+
{
5555+
return DatabaseDescriptor.getDisableBlockOnReadRepair();
5556+
}
5557+
5558+
public void setDisableBlockOnReadRepair(boolean value)
5559+
{
5560+
DatabaseDescriptor.setDisableBlockOnReadRepair(value);
5561+
logger.info("Updated disable_block_on_read_repair to {}", SafeArg.of("value", value));
55405562
}
55415563
}

src/java/org/apache/cassandra/service/StorageServiceMBean.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,4 +936,12 @@ public enum ProgressState
936936
public boolean getCoerceReadConsistencyAll();
937937

938938
public void setCoerceReadConsistencyAll(boolean value);
939+
940+
public boolean getDisableReadRepairMutation();
941+
942+
public void setDisableReadRepairMutation(boolean value);
943+
944+
public boolean getDisableBlockOnReadRepair();
945+
946+
public void setDisableBlockOnReadRepair(boolean value);
939947
}

src/java/org/apache/cassandra/tools/NodeProbe.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,26 @@ public void setCoerceReadConsistencyAll(boolean value)
11941194
ssProxy.setCoerceReadConsistencyAll(value);
11951195
}
11961196

1197+
public boolean getDisableReadRepairMutation()
1198+
{
1199+
return ssProxy.getDisableReadRepairMutation();
1200+
}
1201+
1202+
public void setDisableReadRepairMutation(boolean value)
1203+
{
1204+
ssProxy.setDisableReadRepairMutation(value);
1205+
}
1206+
1207+
public boolean getDisableBlockOnReadRepair()
1208+
{
1209+
return ssProxy.getDisableBlockOnReadRepair();
1210+
}
1211+
1212+
public void setDisableBlockOnReadRepair(boolean value)
1213+
{
1214+
ssProxy.setDisableBlockOnReadRepair(value);
1215+
}
1216+
11971217
// JMX getters for the o.a.c.metrics API below.
11981218
/**
11991219
* Retrieve cache metrics based on the cache type (KeyCache, RowCache, or CounterCache)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.tools.nodetool;
20+
21+
import io.airlift.command.Command;
22+
import org.apache.cassandra.tools.NodeProbe;
23+
import org.apache.cassandra.tools.NodeTool;
24+
25+
@Command(name = "getdisableblockonreadrepair", description = "Check if blocking on read repair is disabled")
26+
public class GetDisableBlockOnReadRepair extends NodeTool.NodeToolCmd
27+
{
28+
@Override
29+
public void execute(NodeProbe probe)
30+
{
31+
probe.output().out.println(probe.getDisableBlockOnReadRepair());
32+
}
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.tools.nodetool;
20+
21+
import io.airlift.command.Command;
22+
import org.apache.cassandra.tools.NodeProbe;
23+
import org.apache.cassandra.tools.NodeTool;
24+
25+
@Command(name = "getdisablereadrepairmutation", description = "Check if read repair mutations are disabled")
26+
public class GetDisableReadRepairMutation extends NodeTool.NodeToolCmd
27+
{
28+
@Override
29+
public void execute(NodeProbe probe)
30+
{
31+
probe.output().out.println(probe.getDisableReadRepairMutation());
32+
}
33+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.tools.nodetool;
20+
21+
import io.airlift.command.Arguments;
22+
import io.airlift.command.Command;
23+
import org.apache.cassandra.tools.NodeProbe;
24+
import org.apache.cassandra.tools.NodeTool;
25+
26+
@Command(name = "setdisableblockonreadrepair", description = "Disable block on read repair")
27+
public class SetDisableBlockOnReadRepair extends NodeTool.NodeToolCmd
28+
{
29+
@Arguments(title = "flag", description = "true, or false (default)", required = true)
30+
private String flag;
31+
32+
@Override
33+
public void execute(NodeProbe probe)
34+
{
35+
probe.setDisableBlockOnReadRepair(Boolean.parseBoolean(flag));
36+
}
37+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.tools.nodetool;
20+
21+
import io.airlift.command.Arguments;
22+
import io.airlift.command.Command;
23+
import org.apache.cassandra.tools.NodeProbe;
24+
import org.apache.cassandra.tools.NodeTool;
25+
26+
@Command(name = "setdisablereadrepairmutation", description = "Disable read repair mutations")
27+
public class SetDisableReadRepairMutation extends NodeTool.NodeToolCmd
28+
{
29+
@Arguments(title = "flag", description = "true, or false (default)", required = true)
30+
private String flag;
31+
32+
@Override
33+
public void execute(NodeProbe probe)
34+
{
35+
probe.setDisableReadRepairMutation(Boolean.parseBoolean(flag));
36+
}
37+
}

0 commit comments

Comments
 (0)