Skip to content

Commit c83f5a5

Browse files
committed
[FLINK-34558][state] Support tracking state size
1 parent 2d9fb79 commit c83f5a5

File tree

52 files changed

+1750
-703
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1750
-703
lines changed

flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public static final class Sections {
7979
public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb";
8080

8181
public static final String STATE_LATENCY_TRACKING = "state_latency_tracking";
82+
public static final String STATE_SIZE_TRACKING = "state_size_tracking";
8283

8384
public static final String STATE_CHANGELOG = "state_changelog";
8485

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.configuration;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.annotation.docs.Documentation;
23+
24+
/** A collection of all configuration options that relate to the size tracking for state access. */
25+
@PublicEvolving
26+
public class StateSizeTrackOptions {
27+
28+
@Documentation.Section(Documentation.Sections.STATE_SIZE_TRACKING)
29+
public static final ConfigOption<Boolean> SIZE_TRACK_ENABLED =
30+
ConfigOptions.key("state.backend.size-track.keyed-state-enabled")
31+
.booleanType()
32+
.defaultValue(false)
33+
.withDescription(
34+
"Whether to track size of keyed state operations, e.g value state put/get/clear.");
35+
36+
@Documentation.Section(Documentation.Sections.STATE_SIZE_TRACKING)
37+
public static final ConfigOption<Integer> SIZE_TRACK_SAMPLE_INTERVAL =
38+
ConfigOptions.key("state.backend.size-track.sample-interval")
39+
.intType()
40+
.defaultValue(100)
41+
.withDescription(
42+
String.format(
43+
"The sample interval of size track once '%s' is enabled. "
44+
+ "The default value is 100, which means we would track the size every 100 access requests.",
45+
SIZE_TRACK_ENABLED.key()));
46+
47+
@Documentation.Section(Documentation.Sections.STATE_SIZE_TRACKING)
48+
public static final ConfigOption<Integer> SIZE_TRACK_HISTORY_SIZE =
49+
ConfigOptions.key("state.backend.size-track.history-size")
50+
.intType()
51+
.defaultValue(128)
52+
.withDescription(
53+
"Defines the number of measured size to maintain at each state access operation.");
54+
55+
@Documentation.Section(Documentation.Sections.STATE_SIZE_TRACKING)
56+
public static final ConfigOption<Boolean> SIZE_TRACK_STATE_NAME_AS_VARIABLE =
57+
ConfigOptions.key("state.backend.size-track.state-name-as-variable")
58+
.booleanType()
59+
.defaultValue(true)
60+
.withDescription(
61+
"Whether to expose state name as a variable if tracking size.");
62+
}

flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
5353
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
5454
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
55+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
5556
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
5657
import org.apache.flink.runtime.state.ttl.mock.MockRestoreOperation;
5758
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
@@ -132,6 +133,7 @@ private static CountingKeysKeyedStateBackend createCountingKeysKeyedStateBackend
132133
env.getExecutionConfig(),
133134
ttlTimeProvider,
134135
LatencyTrackingStateConfig.disabled(),
136+
SizeTrackingStateConfig.disabled(),
135137
cancelStreamRegistry,
136138
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups));
137139
}
@@ -269,6 +271,7 @@ public CountingKeysKeyedStateBackend(
269271
ExecutionConfig executionConfig,
270272
TtlTimeProvider ttlTimeProvider,
271273
LatencyTrackingStateConfig latencyTrackingStateConfig,
274+
SizeTrackingStateConfig sizeTrackingStateConfig,
272275
CloseableRegistry cancelStreamRegistry,
273276
InternalKeyContext<Integer> keyContext) {
274277
super(
@@ -278,6 +281,7 @@ public CountingKeysKeyedStateBackend(
278281
executionConfig,
279282
ttlTimeProvider,
280283
latencyTrackingStateConfig,
284+
sizeTrackingStateConfig,
281285
cancelStreamRegistry,
282286
keyContext);
283287
this.numberOfKeysGenerated = numberOfKeysGenerated;

flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.flink.runtime.state.internal.InternalListState;
4545
import org.apache.flink.runtime.state.internal.InternalMapState;
4646
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
47+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
4748
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
4849

4950
import org.junit.jupiter.api.Test;
@@ -336,6 +337,7 @@ private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key,
336337
executionConfig,
337338
TtlTimeProvider.DEFAULT,
338339
LatencyTrackingStateConfig.disabled(),
340+
SizeTrackingStateConfig.disabled(),
339341
Collections.emptyList(),
340342
AbstractStateBackend.getCompressionDecorator(executionConfig),
341343
TestLocalRecoveryConfig.disabled(),

flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.runtime.state.internal.InternalKvState;
3232
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
3333
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
34+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
3435
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
3536
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
3637
import org.apache.flink.util.IOUtils;
@@ -93,6 +94,8 @@ public abstract class AbstractKeyedStateBackend<K>
9394

9495
protected final LatencyTrackingStateConfig latencyTrackingStateConfig;
9596

97+
protected final SizeTrackingStateConfig sizeTrackingStateConfig;
98+
9699
/** Decorates the input and output streams to write key-groups compressed. */
97100
protected final StreamCompressionDecorator keyGroupCompressionDecorator;
98101

@@ -106,6 +109,7 @@ public AbstractKeyedStateBackend(
106109
ExecutionConfig executionConfig,
107110
TtlTimeProvider ttlTimeProvider,
108111
LatencyTrackingStateConfig latencyTrackingStateConfig,
112+
SizeTrackingStateConfig sizeTrackingStateConfig,
109113
CloseableRegistry cancelStreamRegistry,
110114
InternalKeyContext<K> keyContext) {
111115
this(
@@ -115,6 +119,7 @@ public AbstractKeyedStateBackend(
115119
executionConfig,
116120
ttlTimeProvider,
117121
latencyTrackingStateConfig,
122+
sizeTrackingStateConfig,
118123
cancelStreamRegistry,
119124
determineStreamCompression(executionConfig),
120125
keyContext);
@@ -127,6 +132,7 @@ public AbstractKeyedStateBackend(
127132
ExecutionConfig executionConfig,
128133
TtlTimeProvider ttlTimeProvider,
129134
LatencyTrackingStateConfig latencyTrackingStateConfig,
135+
SizeTrackingStateConfig sizeTrackingStateConfig,
130136
CloseableRegistry cancelStreamRegistry,
131137
StreamCompressionDecorator keyGroupCompressionDecorator,
132138
InternalKeyContext<K> keyContext) {
@@ -137,6 +143,7 @@ public AbstractKeyedStateBackend(
137143
executionConfig,
138144
ttlTimeProvider,
139145
latencyTrackingStateConfig,
146+
sizeTrackingStateConfig,
140147
cancelStreamRegistry,
141148
keyGroupCompressionDecorator,
142149
Preconditions.checkNotNull(keyContext),
@@ -157,6 +164,7 @@ protected AbstractKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedSt
157164
abstractKeyedStateBackend.executionConfig,
158165
abstractKeyedStateBackend.ttlTimeProvider,
159166
abstractKeyedStateBackend.latencyTrackingStateConfig,
167+
abstractKeyedStateBackend.sizeTrackingStateConfig,
160168
abstractKeyedStateBackend.cancelStreamRegistry,
161169
abstractKeyedStateBackend.keyGroupCompressionDecorator,
162170
abstractKeyedStateBackend.keyContext,
@@ -176,6 +184,7 @@ private AbstractKeyedStateBackend(
176184
ExecutionConfig executionConfig,
177185
TtlTimeProvider ttlTimeProvider,
178186
LatencyTrackingStateConfig latencyTrackingStateConfig,
187+
SizeTrackingStateConfig sizeTrackingStateConfig,
179188
CloseableRegistry cancelStreamRegistry,
180189
StreamCompressionDecorator keyGroupCompressionDecorator,
181190
InternalKeyContext<K> keyContext,
@@ -206,6 +215,7 @@ private AbstractKeyedStateBackend(
206215
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
207216
this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider);
208217
this.latencyTrackingStateConfig = Preconditions.checkNotNull(latencyTrackingStateConfig);
218+
this.sizeTrackingStateConfig = Preconditions.checkNotNull(sizeTrackingStateConfig);
209219
this.keySelectionListeners = keySelectionListeners;
210220
this.lastState = lastState;
211221
this.lastName = lastName;
@@ -383,8 +393,10 @@ public <N, S extends State, V> S getOrCreateKeyedState(
383393
LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(
384394
TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
385395
namespaceSerializer, stateDescriptor, this, ttlTimeProvider),
396+
this,
386397
stateDescriptor,
387-
latencyTrackingStateConfig);
398+
latencyTrackingStateConfig,
399+
sizeTrackingStateConfig);
388400
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
389401
publishQueryableStateIfEnabled(stateDescriptor, kvState);
390402
}
@@ -451,6 +463,10 @@ public LatencyTrackingStateConfig getLatencyTrackingStateConfig() {
451463
return latencyTrackingStateConfig;
452464
}
453465

466+
public SizeTrackingStateConfig getSizeTrackingStateConfig() {
467+
return sizeTrackingStateConfig;
468+
}
469+
454470
@VisibleForTesting
455471
public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
456472
return keyGroupCompressionDecorator;

flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.core.fs.CloseableRegistry;
2424
import org.apache.flink.runtime.query.TaskKvStateRegistry;
2525
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
26+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
2627
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
2728

2829
import org.slf4j.Logger;
@@ -45,6 +46,7 @@ public abstract class AbstractKeyedStateBackendBuilder<K>
4546
protected final ExecutionConfig executionConfig;
4647
protected final TtlTimeProvider ttlTimeProvider;
4748
protected final LatencyTrackingStateConfig latencyTrackingStateConfig;
49+
protected final SizeTrackingStateConfig sizeTrackingStateConfig;
4850
protected final StreamCompressionDecorator keyGroupCompressionDecorator;
4951
protected final Collection<KeyedStateHandle> restoreStateHandles;
5052
protected final CloseableRegistry cancelStreamRegistry;
@@ -58,6 +60,7 @@ public AbstractKeyedStateBackendBuilder(
5860
ExecutionConfig executionConfig,
5961
TtlTimeProvider ttlTimeProvider,
6062
LatencyTrackingStateConfig latencyTrackingStateConfig,
63+
SizeTrackingStateConfig sizeTrackingStateConfig,
6164
@Nonnull Collection<KeyedStateHandle> stateHandles,
6265
StreamCompressionDecorator keyGroupCompressionDecorator,
6366
CloseableRegistry cancelStreamRegistry) {
@@ -70,6 +73,7 @@ public AbstractKeyedStateBackendBuilder(
7073
this.executionConfig = executionConfig;
7174
this.ttlTimeProvider = ttlTimeProvider;
7275
this.latencyTrackingStateConfig = latencyTrackingStateConfig;
76+
this.sizeTrackingStateConfig = sizeTrackingStateConfig;
7377
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
7478
this.restoreStateHandles = stateHandles;
7579
this.cancelStreamRegistry = cancelStreamRegistry;

flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.api.common.ExecutionConfig;
2323
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
24+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
2425

2526
import java.io.IOException;
2627

@@ -47,6 +48,9 @@ public static StreamCompressionDecorator getCompressionDecorator(
4748
protected LatencyTrackingStateConfig.Builder latencyTrackingConfigBuilder =
4849
LatencyTrackingStateConfig.newBuilder();
4950

51+
protected SizeTrackingStateConfig.Builder sizeTrackingConfigBuilder =
52+
SizeTrackingStateConfig.newBuilder();
53+
5054
// ------------------------------------------------------------------------
5155
// State Backend - State-Holding Backends
5256
// ------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
3434
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
3535
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
36+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
3637

3738
import java.io.IOException;
3839

@@ -71,6 +72,7 @@ public HashMapStateBackend() {}
7172
private HashMapStateBackend(HashMapStateBackend original, ReadableConfig config) {
7273
// configure latency tracking
7374
latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config);
75+
sizeTrackingConfigBuilder = original.sizeTrackingConfigBuilder.configure(config);
7476
}
7577

7678
@Override
@@ -101,6 +103,8 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
101103

102104
LatencyTrackingStateConfig latencyTrackingStateConfig =
103105
latencyTrackingConfigBuilder.setMetricGroup(parameters.getMetricGroup()).build();
106+
SizeTrackingStateConfig sizeTrackingStateConfig =
107+
sizeTrackingConfigBuilder.setMetricGroup(parameters.getMetricGroup()).build();
104108
return new HeapKeyedStateBackendBuilder<>(
105109
parameters.getKvStateRegistry(),
106110
parameters.getKeySerializer(),
@@ -110,6 +114,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
110114
parameters.getEnv().getExecutionConfig(),
111115
parameters.getTtlTimeProvider(),
112116
latencyTrackingStateConfig,
117+
sizeTrackingStateConfig,
113118
parameters.getStateHandles(),
114119
getCompressionDecorator(parameters.getEnv().getExecutionConfig()),
115120
localRecoveryConfig,

flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java

+3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.flink.runtime.state.StateSnapshotTransformers;
5252
import org.apache.flink.runtime.state.StreamCompressionDecorator;
5353
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
54+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
5455
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
5556
import org.apache.flink.util.FlinkRuntimeException;
5657
import org.apache.flink.util.StateMigrationException;
@@ -141,6 +142,7 @@ public HeapKeyedStateBackend(
141142
ExecutionConfig executionConfig,
142143
TtlTimeProvider ttlTimeProvider,
143144
LatencyTrackingStateConfig latencyTrackingStateConfig,
145+
SizeTrackingStateConfig sizeTrackingStateConfig,
144146
CloseableRegistry cancelStreamRegistry,
145147
StreamCompressionDecorator keyGroupCompressionDecorator,
146148
Map<String, StateTable<K, ?, ?>> registeredKVStates,
@@ -158,6 +160,7 @@ public HeapKeyedStateBackend(
158160
executionConfig,
159161
ttlTimeProvider,
160162
latencyTrackingStateConfig,
163+
sizeTrackingStateConfig,
161164
cancelStreamRegistry,
162165
keyGroupCompressionDecorator,
163166
keyContext);

flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
3434
import org.apache.flink.runtime.state.StreamCompressionDecorator;
3535
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
36+
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
3637
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
3738

3839
import javax.annotation.Nonnull;
@@ -69,6 +70,7 @@ public HeapKeyedStateBackendBuilder(
6970
ExecutionConfig executionConfig,
7071
TtlTimeProvider ttlTimeProvider,
7172
LatencyTrackingStateConfig latencyTrackingStateConfig,
73+
SizeTrackingStateConfig sizeTrackingStateConfig,
7274
@Nonnull Collection<KeyedStateHandle> stateHandles,
7375
StreamCompressionDecorator keyGroupCompressionDecorator,
7476
LocalRecoveryConfig localRecoveryConfig,
@@ -84,6 +86,7 @@ public HeapKeyedStateBackendBuilder(
8486
executionConfig,
8587
ttlTimeProvider,
8688
latencyTrackingStateConfig,
89+
sizeTrackingStateConfig,
8790
stateHandles,
8891
keyGroupCompressionDecorator,
8992
cancelStreamRegistry);
@@ -115,6 +118,7 @@ public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
115118
executionConfig,
116119
ttlTimeProvider,
117120
latencyTrackingStateConfig,
121+
sizeTrackingStateConfig,
118122
cancelStreamRegistryForBackend,
119123
keyGroupCompressionDecorator,
120124
registeredKVStates,

0 commit comments

Comments
 (0)