Skip to content

Commit 55a95df

Browse files
committed
[FLINK-38199] Add SinkUpsertMaterializerBenchmark
1 parent 43d0184 commit 55a95df

File tree

2 files changed

+306
-1
lines changed

2 files changed

+306
-1
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ under the License.
4545

4646
<properties>
4747
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
48-
<flink.version>2.2-SNAPSHOT</flink.version>
48+
<flink.version>2.1-SNAPSHOT</flink.version>
4949
<flink.shaded.version>20.0</flink.shaded.version>
5050
<netty.tcnative.version>2.0.62.Final</netty.tcnative.version>
5151
<java.version>1.8</java.version>
@@ -114,6 +114,13 @@ under the License.
114114
<type>test-jar</type>
115115
</dependency>
116116

117+
<dependency>
118+
<groupId>org.apache.flink</groupId>
119+
<artifactId>flink-table-runtime</artifactId>
120+
<version>${flink.version}</version>
121+
<type>test-jar</type>
122+
</dependency>
123+
117124
<dependency>
118125
<groupId>org.apache.flink</groupId>
119126
<artifactId>flink-test-utils-junit</artifactId>
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.benchmark;
20+
21+
import org.apache.flink.api.common.state.StateTtlConfig;
22+
import org.apache.flink.runtime.state.StateBackend;
23+
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
24+
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
25+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
26+
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
27+
import org.apache.flink.table.data.RowData;
28+
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
29+
import org.apache.flink.table.runtime.generated.HashFunction;
30+
import org.apache.flink.table.runtime.generated.RecordEqualiser;
31+
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
32+
import org.apache.flink.table.runtime.operators.multipleinput.output.BlackHoleOutput;
33+
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
34+
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
35+
import org.apache.flink.table.runtime.util.StateConfigUtil;
36+
import org.apache.flink.table.types.logical.BigIntType;
37+
import org.apache.flink.table.types.logical.IntType;
38+
import org.apache.flink.table.types.logical.LogicalType;
39+
import org.apache.flink.table.types.logical.RowType;
40+
import org.apache.flink.table.types.logical.VarCharType;
41+
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
42+
43+
import org.openjdk.jmh.annotations.*;
44+
import org.openjdk.jmh.runner.Runner;
45+
import org.openjdk.jmh.runner.RunnerException;
46+
import org.openjdk.jmh.runner.options.OptionsBuilder;
47+
import org.openjdk.jmh.runner.options.VerboseMode;
48+
49+
import java.nio.charset.StandardCharsets;
50+
import java.util.Objects;
51+
import java.util.Random;
52+
53+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
54+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
55+
import static org.apache.flink.util.Preconditions.checkState;
56+
57+
/**
58+
* Benchmark for {@link SinkUpsertMaterializer}.
59+
* <p>
60+
* It inserts a fixed number of records (inserts and deletes) in a loop and measures the time it takes to handle all the records. It uses a test harness, not the full Flink job.
61+
* <p>
62+
The benchmark has the following parameters:
63+
<ul>
64+
65+
<li>total number of records - fixed to 10K; all records are inserted under the same stream key</li>
66+
<li>hasUpsertKey: true/false</li>
67+
<li>stateBackend type: HEAP/RocksDB</li>
68+
<li>payload size - fixed to 100 bytes; bigger payload degrades faster on RocksDB</li>
69+
<li>retractPercentage, retractDelay - how many records to retract, and from which record to start retracting; both parameters control how frequently retraction happens and how long the history is.
70+
Retraction is performed from the middle of the list. Retraction is inefficient in the current implementation on long lists.
71+
</ul>
72+
73+
* Results on M3Pro:
74+
*
75+
* <pre>{@code
76+
Benchmark (hasUpsertKey) (payloadSize) (retractDelay) (retractPercentage) (stateBackend) (sumVersion) Mode Cnt Score Error Units
77+
SinkUpsertMaterializerBenchmark.run false 100 1 100 HEAP V1 thrpt 4138.808 ops/ms
78+
SinkUpsertMaterializerBenchmark.run false 100 1 100 ROCKSDB V1 thrpt 284.055 ops/ms
79+
SinkUpsertMaterializerBenchmark.run false 100 10 100 HEAP V1 thrpt 3729.824 ops/ms
80+
SinkUpsertMaterializerBenchmark.run false 100 10 100 ROCKSDB V1 thrpt 205.047 ops/ms
81+
SinkUpsertMaterializerBenchmark.run false 100 100 100 HEAP V1 thrpt 4137.591 ops/ms
82+
SinkUpsertMaterializerBenchmark.run false 100 100 100 ROCKSDB V1 thrpt 80.406 ops/ms
83+
SinkUpsertMaterializerBenchmark.run false 100 200 100 HEAP V1 thrpt 1886.574 ops/ms
84+
SinkUpsertMaterializerBenchmark.run false 100 200 100 ROCKSDB V1 thrpt 30.935 ops/ms
85+
SinkUpsertMaterializerBenchmark.run false 100 1000 100 HEAP V1 thrpt 546.826 ops/ms
86+
SinkUpsertMaterializerBenchmark.run false 100 1000 100 ROCKSDB V1 thrpt 7.081 ops/ms
87+
SinkUpsertMaterializerBenchmark.run true 100 1 100 HEAP V1 thrpt 4006.263 ops/ms
88+
SinkUpsertMaterializerBenchmark.run true 100 1 100 ROCKSDB V1 thrpt 297.556 ops/ms
89+
SinkUpsertMaterializerBenchmark.run true 100 10 100 HEAP V1 thrpt 3240.089 ops/ms
90+
SinkUpsertMaterializerBenchmark.run true 100 10 100 ROCKSDB V1 thrpt 209.375 ops/ms
91+
SinkUpsertMaterializerBenchmark.run true 100 100 100 HEAP V1 thrpt 2131.445 ops/ms
92+
SinkUpsertMaterializerBenchmark.run true 100 100 100 ROCKSDB V1 thrpt 78.209 ops/ms
93+
SinkUpsertMaterializerBenchmark.run true 100 200 100 HEAP V1 thrpt 652.936 ops/ms
94+
SinkUpsertMaterializerBenchmark.run true 100 200 100 ROCKSDB V1 thrpt 29.674 ops/ms
95+
SinkUpsertMaterializerBenchmark.run true 100 1000 100 HEAP V1 thrpt 118.567 ops/ms
96+
SinkUpsertMaterializerBenchmark.run true 100 1000 100 ROCKSDB V1 thrpt 6.426 ops/ms
97+
* }</pre>
98+
*/
99+
@OperationsPerInvocation(value = SinkUpsertMaterializerBenchmark.RECORDS_PER_INVOCATION)
100+
@SuppressWarnings("ConstantValue")
101+
public class SinkUpsertMaterializerBenchmark extends BenchmarkBase {
102+
103+
private static final int STREAM_KEY = 0;
104+
105+
public static void main(String[] args) throws RunnerException {
106+
new Runner(new OptionsBuilder()
107+
.verbosity(VerboseMode.NORMAL)
108+
// speedup
109+
// .warmupIterations(1)
110+
// .measurementIterations(1)
111+
// .forks(1)
112+
// .warmupTime(TimeValue.milliseconds(100))
113+
// .measurementTime(TimeValue.seconds(1))
114+
.include(".*" + SinkUpsertMaterializerBenchmark.class.getCanonicalName() + ".*")
115+
.build()).run();
116+
}
117+
118+
@Benchmark
119+
public void run(SumBmState state) throws Exception {
120+
for (long record = 0; record < state.numRecordsTotal; record++) {
121+
state.harness.processElement(insertRecord(record, STREAM_KEY, state.payload));
122+
if (state.shouldRetract(record)) {
123+
state.harness.processElement(
124+
deleteRecord(record - state.retractOffset, STREAM_KEY, state.payload));
125+
}
126+
}
127+
}
128+
129+
protected static final int RECORDS_PER_INVOCATION = 10_000;
130+
131+
@State(Scope.Thread)
132+
public static class SumBmState {
133+
134+
@Param({"false", "true"}) public boolean hasUpsertKey;
135+
136+
@Param({"HEAP", "ROCKSDB"}) public SumStateBackend stateBackend;
137+
138+
public int numRecordsTotal;
139+
140+
// larger payload amplifies any inefficiencies but slows down the benchmark; mostly affects rocksdb
141+
@Param("100")
142+
public int payloadSize;
143+
144+
// lower retraction percentage implies longer history, making retractions even harder (unless percentage = 0)
145+
@Param("100")
146+
public int retractPercentage;
147+
148+
// higher retraction delay leaves longer history, making retractions even harder (unless percentage = 0)
149+
// for automated runs, reduce the run time (and the data points) to the most common cases
150+
@Param({"1", "100"})
151+
// for comparison, these values might be useful
152+
// @Param({"1", "10", "100", "200", "1000"})
153+
public int retractDelay;
154+
155+
// the lower the value, the closer to the end of the list is the element to retract, the harder for V1 to find the element
156+
public long retractOffset;
157+
158+
public String payload;
159+
160+
public KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness;
161+
162+
@Setup(Level.Invocation)
163+
public void initSumBmState() throws Exception {
164+
harness = getHarness(createSum(hasUpsertKey), stateBackend);
165+
payload = generatePayload(payloadSize);
166+
numRecordsTotal = RECORDS_PER_INVOCATION;
167+
retractOffset = (1 + retractDelay) / 2;
168+
checkState(numRecordsTotal > retractDelay);
169+
checkState(retractPercentage >= 0 && retractPercentage <= 100);
170+
}
171+
172+
173+
@TearDown(Level.Invocation)
174+
public void teardown() throws Exception {
175+
this.harness.close();
176+
this.harness = null;
177+
}
178+
179+
public boolean shouldRetract(long record) {
180+
return retractEnabled() && (retractEverything() || retractRecord(record));
181+
}
182+
183+
private boolean retractEnabled() {
184+
return retractPercentage > 0;
185+
}
186+
187+
private boolean retractEverything() {
188+
return retractPercentage == 100;
189+
}
190+
191+
private boolean retractRecord(long index) {
192+
return index >= retractDelay && index % 100 < retractPercentage;
193+
}
194+
}
195+
196+
private static String generatePayload(int size) {
197+
final byte[] bytes = new byte[size];
198+
new Random().nextBytes(bytes);
199+
return new String(bytes, StandardCharsets.UTF_8);
200+
}
201+
202+
private static final int UPSERT_KEY_POS = 0;
203+
private static final int STREAM_KEY_POS = 1;
204+
private static final int PAYLOAD_POS = 2;
205+
private static final LogicalType[] types =
206+
new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()};
207+
208+
private static KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> getHarness(
209+
OneInputStreamOperator<RowData, RowData> materializer, SumStateBackend stateBackend) throws Exception {
210+
RowDataKeySelector rowDataSelector =
211+
HandwrittenSelectorUtil.getRowDataSelector(new int[] {STREAM_KEY_POS}, types);
212+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
213+
materializer, rowDataSelector, rowDataSelector.getProducedType());
214+
testHarness.getExecutionConfig().setMaxParallelism(2048);
215+
testHarness.setStateBackend(stateBackend.create(true));
216+
testHarness.setOutputCreator(ign -> new BlackHoleOutput()); // requires change in Flink
217+
testHarness.open();
218+
return testHarness;
219+
}
220+
221+
private static OneInputStreamOperator<RowData, RowData> createSum(boolean hasUpsertKey) {
222+
StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(0); // no ttl
223+
RowType physicalRowType = RowType.of(types);
224+
int[] inputUpsertKey = hasUpsertKey ? new int[] {UPSERT_KEY_POS} : null;
225+
return new SinkUpsertMaterializer(
226+
ttlConfig,
227+
InternalSerializers.create(physicalRowType),
228+
equalizer(),
229+
hasUpsertKey ? upsertEqualizer() : null,
230+
inputUpsertKey);
231+
}
232+
private static class TestRecordEqualiser implements RecordEqualiser, HashFunction {
233+
@Override
234+
public boolean equals(RowData row1, RowData row2) {
235+
return row1.getRowKind() == row2.getRowKind()
236+
&& row1.getLong(UPSERT_KEY_POS) == row2.getLong(UPSERT_KEY_POS)
237+
&& row1.getInt(STREAM_KEY_POS) == row2.getInt(STREAM_KEY_POS)
238+
&& row1.getString(PAYLOAD_POS).equals(row2.getString(PAYLOAD_POS));
239+
}
240+
241+
@Override
242+
public int hashCode(Object data) {
243+
RowData rd = (RowData) data;
244+
return Objects.hash(
245+
rd.getLong(UPSERT_KEY_POS), rd.getInt(STREAM_KEY_POS), rd.getString(PAYLOAD_POS));
246+
}
247+
}
248+
249+
private static class TestUpsertKeyEqualiser implements RecordEqualiser, HashFunction {
250+
@Override
251+
public boolean equals(RowData row1, RowData row2) {
252+
return row1.getRowKind() == row2.getRowKind()
253+
&& row1.getLong(UPSERT_KEY_POS) == row2.getLong(UPSERT_KEY_POS);
254+
}
255+
256+
@Override
257+
public int hashCode(Object data) {
258+
return Long.hashCode(((RowData) data).getLong(UPSERT_KEY_POS));
259+
}
260+
}
261+
262+
private static GeneratedRecordEqualiser equalizer() {
263+
return new GeneratedRecordEqualiser("", "", new Object[0]) {
264+
265+
@Override
266+
public RecordEqualiser newInstance(ClassLoader classLoader) {
267+
return new TestRecordEqualiser();
268+
}
269+
};
270+
}
271+
272+
private static GeneratedRecordEqualiser upsertEqualizer() {
273+
return new GeneratedRecordEqualiser("", "", new Object[0]) {
274+
275+
@Override
276+
public RecordEqualiser newInstance(ClassLoader classLoader) {
277+
return new TestUpsertKeyEqualiser();
278+
}
279+
};
280+
}
281+
282+
public enum SumStateBackend {
283+
HEAP {
284+
285+
public StateBackend create(boolean incrementalIfSupported) {
286+
return new HashMapStateBackend();
287+
}
288+
},
289+
ROCKSDB {
290+
291+
public StateBackend create(boolean incrementalIfSupported) {
292+
return new EmbeddedRocksDBStateBackend(incrementalIfSupported);
293+
}
294+
};
295+
296+
public abstract StateBackend create(boolean incrementalIfSupported);
297+
}
298+
}

0 commit comments

Comments
 (0)