Skip to content

Commit c6b1ffc

Browse files
authored
IGNITE-24266 Add new temporary benchmarks that cover inserting/reading when colocation feature is enabled (#5133)
1 parent d61bbf6 commit c6b1ffc

File tree

11 files changed

+434
-21
lines changed

11 files changed

+434
-21
lines changed

modules/core/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionId.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.internal.replicator;
1919

20-
import java.util.Objects;
2120
import java.util.regex.Pattern;
2221

2322
/**
@@ -120,6 +119,12 @@ public boolean equals(Object o) {
120119

121120
@Override
122121
public int hashCode() {
123-
return Objects.hash(zoneId, partId, tableId);
122+
int result = 1;
123+
124+
result = 31 * result + zoneId;
125+
result = 31 * result + partId;
126+
result = 31 * result + tableId;
127+
128+
return result;
124129
}
125130
}

modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public class DistributionZoneRebalanceEngine {
8585
// TODO: after switching to zone-based replication
8686
private final DistributionZoneRebalanceEngineV2 distributionZoneRebalanceEngineV2;
8787

88-
public static final String FEATURE_FLAG_NAME = "IGNITE_ZONE_BASED_REPLICATION";
8988
/* Feature flag for zone based collocation track */
9089
// TODO IGNITE-22115 remove it
91-
public static final boolean ENABLED = getBoolean(FEATURE_FLAG_NAME, false);
90+
private static final String FEATURE_FLAG_NAME = "IGNITE_ZONE_BASED_REPLICATION";
91+
private final boolean enabledColocationFeature = getBoolean(FEATURE_FLAG_NAME, false);
9292

9393
/** Special flag to skip rebalance on node recovery for tests. */
9494
// TODO: IGNITE-23561 Remove it
@@ -149,7 +149,7 @@ protected CompletableFuture<Void> onReplicasUpdate(AlterZoneEventParameters para
149149
return nullCompletedFuture();
150150
}
151151

152-
if (ENABLED) {
152+
if (enabledColocationFeature) {
153153
return rebalanceTriggersRecovery(recoveryRevision, catalogVersion)
154154
.thenCompose(v -> distributionZoneRebalanceEngineV2.startAsync());
155155
} else {
@@ -193,7 +193,7 @@ public void stop() {
193193
return;
194194
}
195195

196-
if (ENABLED) {
196+
if (enabledColocationFeature) {
197197
distributionZoneRebalanceEngineV2.stop();
198198
}
199199

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,10 @@
145145
*/
146146
public class PartitionReplicaLifecycleManager extends
147147
AbstractEventProducer<LocalPartitionReplicaEvent, LocalPartitionReplicaEventParameters> implements IgniteComponent {
148-
public static final String FEATURE_FLAG_NAME = "IGNITE_ZONE_BASED_REPLICATION";
149148
/* Feature flag for zone based collocation track */
150149
// TODO IGNITE-22115 remove it
151-
public static final boolean ENABLED = getBoolean(FEATURE_FLAG_NAME, false);
150+
public static final String FEATURE_FLAG_NAME = "IGNITE_ZONE_BASED_REPLICATION";
151+
private final boolean enabledColocationFeature = getBoolean(FEATURE_FLAG_NAME, false);
152152

153153
private final CatalogManager catalogMgr;
154154

@@ -270,7 +270,7 @@ public PartitionReplicaLifecycleManager(
270270

271271
@Override
272272
public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
273-
if (!ENABLED) {
273+
if (!enabledColocationFeature) {
274274
return nullCompletedFuture();
275275
}
276276

@@ -1181,7 +1181,7 @@ private Assignment localMemberAssignment(Assignments assignments) {
11811181

11821182
@Override
11831183
public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
1184-
if (!ENABLED) {
1184+
if (!enabledColocationFeature) {
11851185
return nullCompletedFuture();
11861186
}
11871187

modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
226226

227227
/* Temporary converter to support the zone based partitions in tests. **/
228228
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this code
229-
private Function<ReplicaRequest, ReplicationGroupId> groupIdConverter = r -> r.groupId().asReplicationGroupId();
229+
private volatile Function<ReplicaRequest, ReplicationGroupId> groupIdConverter = r -> r.groupId().asReplicationGroupId();
230230

231231
private volatile @Nullable HybridTimestamp lastIdleSafeTimeProposal;
232232

@@ -295,6 +295,11 @@ public ReplicaManager(
295295
this.groupIdConverter = groupIdConverter;
296296
}
297297

298+
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this method.
299+
public void groupIdConverter(Function<ReplicaRequest, ReplicationGroupId> converter) {
300+
groupIdConverter = converter;
301+
}
302+
298303
/**
299304
* Constructor for a replica service.
300305
*
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.benchmark;
19+
20+
import static org.apache.ignite.catalog.definitions.ColumnDefinition.column;
21+
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
22+
import static org.apache.ignite.internal.testframework.TestIgnitionManager.PRODUCTION_CLUSTER_CONFIG_STRING;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import org.apache.ignite.catalog.ColumnType;
27+
import org.apache.ignite.catalog.definitions.TableDefinition;
28+
import org.apache.ignite.catalog.definitions.ZoneDefinition;
29+
import org.apache.ignite.internal.replicator.TablePartitionId;
30+
import org.apache.ignite.internal.replicator.ZonePartitionId;
31+
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
32+
import org.apache.ignite.table.KeyValueView;
33+
import org.apache.ignite.table.Table;
34+
import org.apache.ignite.table.Tuple;
35+
import org.openjdk.jmh.annotations.Param;
36+
37+
/**
38+
* Base class that allows to measure basic KeyValue operations for tables that share the same distribution zone.
39+
* TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this benchmark.
40+
*/
41+
public class AbstractColocationBenchmark extends AbstractMultiNodeBenchmark {
42+
/** Name of shared zone. */
43+
private static final String SHARED_ZONE_NAME = "shared_zone";
44+
45+
/** System property that allows to enable/disable colocation feature. */
46+
private static final String FEATURE_FLAG_NAME = "IGNITE_ZONE_BASED_REPLICATION";
47+
48+
protected final List<KeyValueView<Tuple, Tuple>> tableViews = new ArrayList<>();
49+
50+
@Param({"32"})
51+
private int partitionCount;
52+
53+
@Param({"1", "32", "64"})
54+
private int tableCount;
55+
56+
@Param({"true"})
57+
private boolean tinySchemaSyncWaits;
58+
59+
@Param({"false", "true"})
60+
private boolean tableZoneColocationEnabled;
61+
62+
@Override
63+
protected int nodes() {
64+
return 1;
65+
}
66+
67+
@Override
68+
protected int replicaCount() {
69+
return 1;
70+
}
71+
72+
@Override
73+
protected int partitionCount() {
74+
return partitionCount;
75+
}
76+
77+
@Override
78+
protected String clusterConfiguration() {
79+
if (tinySchemaSyncWaits()) {
80+
return super.clusterConfiguration();
81+
} else {
82+
// Return a magic string that explicitly requests production defaults.
83+
return PRODUCTION_CLUSTER_CONFIG_STRING;
84+
}
85+
}
86+
87+
@Override
88+
protected void createDistributionZoneOnStartup() {
89+
ZoneDefinition zone = ZoneDefinition.builder(SHARED_ZONE_NAME)
90+
.partitions(partitionCount())
91+
.replicas(replicaCount())
92+
.storageProfiles(DEFAULT_STORAGE_PROFILE)
93+
.build();
94+
95+
publicIgnite.catalog().createZone(zone);
96+
}
97+
98+
@Override
99+
protected void createTablesOnStartup() {
100+
for (int i = 1; i <= tableCount(); ++i) {
101+
TableDefinition tableDefinition = TableDefinition.builder("test_table_" + i)
102+
.columns(
103+
column("id", ColumnType.INTEGER),
104+
column("company", ColumnType.varchar(32)))
105+
.primaryKey("id")
106+
.zone(SHARED_ZONE_NAME)
107+
.build();
108+
109+
Table t = publicIgnite.catalog().createTable(tableDefinition);
110+
111+
tableViews.add(t.keyValueView());
112+
}
113+
}
114+
115+
@Override
116+
public void nodeSetUp() throws Exception {
117+
boolean colocationFeatureEnabled = enableColocationFeature();
118+
119+
// Enable/disable collocation feature.
120+
System.setProperty(FEATURE_FLAG_NAME, Boolean.toString(colocationFeatureEnabled));
121+
122+
// Start the cluster and initialize it.
123+
super.nodeSetUp();
124+
125+
// Patch replica manager to propagate table replication messages to zone replication groups.
126+
if (colocationFeatureEnabled) {
127+
int catalogVersion = igniteImpl
128+
.catalogManager()
129+
.latestCatalogVersion();
130+
131+
int zoneId = igniteImpl
132+
.catalogManager()
133+
.catalog(catalogVersion)
134+
.zone(SHARED_ZONE_NAME.toUpperCase())
135+
.id();
136+
137+
igniteImpl.replicaManager().groupIdConverter(request -> {
138+
if (!(request instanceof WriteIntentSwitchReplicaRequest)) {
139+
if (request.groupId().asReplicationGroupId() instanceof TablePartitionId) {
140+
TablePartitionId tablePartitionId = (TablePartitionId) request.groupId().asReplicationGroupId();
141+
142+
return new ZonePartitionId(zoneId, tablePartitionId.partitionId());
143+
}
144+
}
145+
146+
return request.groupId().asReplicationGroupId();
147+
});
148+
}
149+
}
150+
151+
protected boolean enableColocationFeature() {
152+
return tableZoneColocationEnabled;
153+
}
154+
155+
protected int tableCount() {
156+
return tableCount;
157+
}
158+
159+
protected boolean tinySchemaSyncWaits() {
160+
return tinySchemaSyncWaits;
161+
}
162+
}

modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public void nodeSetUp() throws Exception {
9292
// Create a new zone on the cluster's start-up.
9393
createDistributionZoneOnStartup();
9494

95-
// Create a table on the cluster's start-up.
96-
createTableOnStartup();
95+
// Create tables on the cluster's start-up.
96+
createTablesOnStartup();
9797
} catch (Throwable th) {
9898
nodeTearDown();
9999

@@ -110,7 +110,7 @@ protected void createDistributionZoneOnStartup() {
110110
}
111111
}
112112

113-
protected void createTableOnStartup() {
113+
protected void createTablesOnStartup() {
114114
createTable(TABLE_NAME);
115115
}
116116

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.benchmark;
19+
20+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
21+
import static java.util.concurrent.TimeUnit.SECONDS;
22+
import static org.openjdk.jmh.annotations.Mode.AverageTime;
23+
import static org.openjdk.jmh.annotations.Mode.Throughput;
24+
25+
import java.util.Map;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import org.apache.ignite.table.KeyValueView;
28+
import org.apache.ignite.table.Tuple;
29+
import org.openjdk.jmh.annotations.Benchmark;
30+
import org.openjdk.jmh.annotations.BenchmarkMode;
31+
import org.openjdk.jmh.annotations.Fork;
32+
import org.openjdk.jmh.annotations.Measurement;
33+
import org.openjdk.jmh.annotations.OutputTimeUnit;
34+
import org.openjdk.jmh.annotations.Scope;
35+
import org.openjdk.jmh.annotations.State;
36+
import org.openjdk.jmh.annotations.Threads;
37+
import org.openjdk.jmh.annotations.Warmup;
38+
import org.openjdk.jmh.runner.Runner;
39+
import org.openjdk.jmh.runner.RunnerException;
40+
import org.openjdk.jmh.runner.options.Options;
41+
import org.openjdk.jmh.runner.options.OptionsBuilder;
42+
43+
/**
44+
* This benchmark allows to measure inserting key value pairs via KeyValue API for tables that share the same distribution zone.
45+
* TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this benchmark.
46+
*/
47+
@Fork(1)
48+
@State(Scope.Benchmark)
49+
public class ColocationInsertBenchmark extends AbstractColocationBenchmark {
50+
private final Tuple tuple = Tuple.create().set("company", "Apache");
51+
52+
private final AtomicInteger counter = new AtomicInteger();
53+
54+
/**
55+
* Measures throughput of key-value api for tables that share the same distribution zone.
56+
*/
57+
@Benchmark
58+
@Threads(Threads.MAX)
59+
@Warmup(iterations = 5, time = 2)
60+
@Measurement(iterations = 10, time = 2)
61+
@BenchmarkMode(Throughput)
62+
@OutputTimeUnit(SECONDS)
63+
public void insertKeyValueApiThroughput() {
64+
doPut();
65+
}
66+
67+
/**
68+
* Measures average time of inserting a key-value pair for tables that share the same distribution zone.
69+
*/
70+
@Benchmark
71+
@Threads(1)
72+
@Warmup(iterations = 5, time = 2)
73+
@Measurement(iterations = 10, time = 2)
74+
@BenchmarkMode(AverageTime)
75+
@OutputTimeUnit(MICROSECONDS)
76+
public void insertKeyValueApiAverage() {
77+
doPut();
78+
}
79+
80+
private void doPut() {
81+
int currentId = counter.getAndIncrement();
82+
int tableIdx = currentId % tableViews.size();
83+
84+
KeyValueView<Tuple, Tuple> kvView = tableViews.get(tableIdx);
85+
86+
publicIgnite.transactions().runInTransaction(tx -> {
87+
kvView.putAll(tx, Map.of(Tuple.create().set("id", currentId), tuple));
88+
});
89+
}
90+
91+
/**
92+
* Benchmark's entry point.
93+
*/
94+
public static void main(String[] args) throws RunnerException {
95+
Options opt = new OptionsBuilder()
96+
.include(".*" + ColocationInsertBenchmark.class.getSimpleName() + ".*")
97+
.build();
98+
99+
new Runner(opt).run();
100+
}
101+
}

0 commit comments

Comments
 (0)