Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36936][table] Introduce UpdatableTopNFunction in Rank with Async State API #25819

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@
import org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateUpdatableTopNFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.runtime.util.StateConfigUtil;
@@ -86,7 +87,7 @@
@ExecNodeMetadata(
name = "stream-exec-rank",
version = 1,
consumedOptions = {"table.exec.rank.topn-cache-size"},
consumedOptions = {"table.exec.rank.topn-cache-size", "table.exec.async-state.enabled"},
producedTransformations = StreamExecRank.RANK_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
@@ -346,18 +347,33 @@ protected Transformation<RowData> translateToPlanInternal(
planner.getFlinkContext().getClassLoader(),
primaryKeys,
inputRowTypeInfo);
processFunction =
new UpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
if (isAsyncStateEnabled) {
processFunction =
new AsyncStateUpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else {
processFunction =
new UpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
}
} else if (rankStrategy instanceof RankProcessStrategy.RetractStrategy) {
EqualiserCodeGenerator equaliserCodeGen =
Original file line number Diff line number Diff line change
@@ -268,8 +268,6 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean)
DataTypes.INT().getLogicalType,
DataTypes.BIGINT().getLogicalType))

assertThat(isAsyncStateOperator(testHarness)).isFalse

(testHarness, assertor)
}

Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

import java.util.Objects;

@@ -36,8 +35,6 @@ public abstract class AbstractSyncStateTopNFunction extends AbstractTopNFunction

private ValueState<Long> rankEndState;

protected long rankEnd;

public AbstractSyncStateTopNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
@@ -101,30 +98,4 @@ protected long initRankEnd(RowData row) throws Exception {
}
}
}

// ====== utility methods that omit the specified rank end ======

protected boolean isInRankEnd(long rank) {
return rank <= rankEnd;
}

protected boolean isInRankRange(long rank) {
return rank <= rankEnd && rank >= rankStart;
}

protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) {
collectInsert(out, inputRow, rank, rankEnd);
}

protected void collectDelete(Collector<RowData> out, RowData inputRow, long rank) {
collectDelete(out, inputRow, rank, rankEnd);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateAfter(out, inputRow, rank, rankEnd);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateBefore(out, inputRow, rank, rankEnd);
}
}
Original file line number Diff line number Diff line change
@@ -80,6 +80,8 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData,

protected final long rankStart;

protected long rankEnd;

// constant rank end
// if rank end is variable, this var is null
@Nullable protected final Long constantRankEnd;
@@ -228,6 +230,10 @@ protected void registerMetric(long heapSize, long requestCount, long hitCount) {
.<Long, Gauge<Long>>gauge("topn.cache.size", () -> heapSize);
}

protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) {
collectInsert(out, inputRow, rank, rankEnd);
}

protected void collectInsert(
Collector<RowData> out, RowData inputRow, long rank, long rankEnd) {
if (isInRankRange(rank, rankEnd)) {
@@ -282,6 +288,26 @@ protected boolean isInRankRange(long rank, long rankEnd) {
return rank <= rankEnd && rank >= rankStart;
}

public boolean isInRankEnd(long rank) {
return rank <= rankEnd;
}

protected boolean isInRankRange(long rank) {
return rank <= rankEnd && rank >= rankStart;
}

protected void collectDelete(Collector<RowData> out, RowData inputRow, long rank) {
collectDelete(out, inputRow, rank, rankEnd);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateAfter(out, inputRow, rank, rankEnd);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) {
collectUpdateBefore(out, inputRow, rank, rankEnd);
}

protected boolean hasOffset() {
// rank start is 1-based
return rankStart > 1;
@@ -343,6 +369,10 @@ protected void collectInsert(
topNFunction.collectInsert(out, inputRow, rank, rankEnd);
}

protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) {
topNFunction.collectInsert(out, inputRow, rank);
}

protected void collectInsert(Collector<RowData> out, RowData inputRow) {
topNFunction.collectInsert(out, inputRow);
}
@@ -361,6 +391,10 @@ protected void collectUpdateAfter(
topNFunction.collectUpdateAfter(out, inputRow, rank, rankEnd);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) {
topNFunction.collectUpdateAfter(out, inputRow, rank);
}

protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow) {
topNFunction.collectUpdateAfter(out, inputRow);
}
@@ -370,10 +404,18 @@ protected void collectUpdateBefore(
topNFunction.collectUpdateBefore(out, inputRow, rank, rankEnd);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) {
topNFunction.collectUpdateBefore(out, inputRow, rank);
}

protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow) {
topNFunction.collectUpdateBefore(out, inputRow);
}

protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer) {
return topNFunction.checkSortKeyInBufferRange(sortKey, buffer);
}

protected boolean isInRankEnd(long rank, long rankEnd) {
return rank <= rankEnd;
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -89,6 +89,7 @@ public void open(OpenContext openContext) throws Exception {
*/
protected StateFuture<Long> initRankEnd(RowData row) {
if (isConstantRankEnd) {
rankEnd = Objects.requireNonNull(constantRankEnd);
return StateFutureUtils.completedFuture(Objects.requireNonNull(constantRankEnd));
} else {
return rankEndState
@@ -98,9 +99,11 @@ protected StateFuture<Long> initRankEnd(RowData row) {
long curRankEnd = rankEndFetcher.apply(row);
if (rankEndInState == null) {
// no need to wait this future
rankEnd = curRankEnd;
rankEndState.asyncUpdate(curRankEnd);
return curRankEnd;
} else {
rankEnd = rankEndInState;
if (rankEndInState != curRankEnd) {
// increment the invalid counter when the current rank end
// not equal to previous rank end
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.rank.async;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.operators.rank.utils.RankRow;
import org.apache.flink.table.runtime.operators.rank.utils.UpdatableTopNHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.Map;

/**
* A TopN function could handle updating stream. It is a fast version of {@link
* RetractableTopNFunction} which only hold top n data in state, and keep sorted map in heap.
* However, the function only works in some special scenarios: 1. sort field collation is ascending
* and its mono is decreasing, or sort field collation is descending and its mono is increasing 2.
* input data has unique keys and unique key must contain partition key 3. input stream could not
* contain DELETE record or UPDATE_BEFORE record
*/
public class AsyncStateUpdatableTopNFunction extends AbstractAsyncStateTopNFunction
implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private final InternalTypeInfo<RowData> rowKeyType;
private final long cacheSize;

// a map state stores mapping from row key to record which is in topN
// in tuple2, f0 is the record row, f1 is the index in the list of the same sort_key
// the f1 is used to preserve the record order in the same sort_key
private transient MapState<RowData, Tuple2<RowData, Integer>> dataState;

// a HashMap stores mapping from rowKey to record, a heap mirror to dataState
private transient Map<RowData, RankRow> rowKeyMap;

private final TypeSerializer<RowData> inputRowSer;
private final KeySelector<RowData, RowData> rowKeySelector;

private transient AsyncStateUpdatableTopNHelper helper;

public AsyncStateUpdatableTopNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
RowDataKeySelector rowKeySelector,
GeneratedRecordComparator generatedRecordComparator,
RowDataKeySelector sortKeySelector,
RankType rankType,
RankRange rankRange,
boolean generateUpdateBefore,
boolean outputRankNumber,
long cacheSize) {
super(
ttlConfig,
inputRowType,
generatedRecordComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
this.rowKeyType = rowKeySelector.getProducedType();
this.cacheSize = cacheSize;
this.inputRowSer = inputRowType.createSerializer(new SerializerConfigImpl());
this.rowKeySelector = rowKeySelector;
}

@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);

TupleTypeInfo<Tuple2<RowData, Integer>> valueTypeInfo =
new TupleTypeInfo<>(inputRowType, Types.INT);
MapStateDescriptor<RowData, Tuple2<RowData, Integer>> mapStateDescriptor =
new MapStateDescriptor<>("data-state-with-update", rowKeyType, valueTypeInfo);
if (ttlConfig.isEnabled()) {
mapStateDescriptor.enableTimeToLive(ttlConfig);
}
dataState = ((StreamingRuntimeContext) getRuntimeContext()).getMapState(mapStateDescriptor);

helper = new AsyncStateUpdatableTopNHelper();

// metrics
helper.registerMetric();
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// nothing to do
}

@Override
public void processElement(RowData input, Context context, Collector<RowData> out)
throws Exception {
helper.initHeapStates();

initRankEnd(input)
.thenAccept(
VOID -> {
if (outputRankNumber || hasOffset()) {
// the without-number-algorithm can't handle topN with offset,
// so use the with-number-algorithm to handle offset
helper.processElementWithRowNumber(input, out);
} else {
helper.processElementWithoutRowNumber(input, out);
}
});
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
helper.flushAllCacheToState();
}

private class AsyncStateUpdatableTopNHelper extends UpdatableTopNHelper {
private AsyncStateUpdatableTopNHelper() {
super(
AsyncStateUpdatableTopNFunction.this,
cacheSize,
getDefaultTopNSize(),
rowKeySelector,
inputRowSer,
inputRowType.getDataType(),
getRuntimeContext().getUserCodeClassLoader());
}

@Override
protected void flushBufferToState(
RowData currentKey, Tuple2<TopNBuffer, Map<RowData, RankRow>> bufferEntry)
throws Exception {
((AsyncStateProcessingOperator) keyContext)
.asyncProcessWithKey(
currentKey,
() -> {
Map<RowData, RankRow> curRowKeyMap = bufferEntry.f1;
for (Map.Entry<RowData, RankRow> entry : curRowKeyMap.entrySet()) {
RowData key = entry.getKey();
RankRow rankRow = entry.getValue();
if (rankRow.dirty) {
// should update state
dataState.put(
key, Tuple2.of(rankRow.row, rankRow.innerRank));
rankRow.dirty = false;
}
}
});
}

@Override
protected void removeDataState(RowData rowKey) throws Exception {
// no need to wait for the future
dataState.remove(rowKey);
}

@Override
protected Iterator<Map.Entry<RowData, Tuple2<RowData, Integer>>> getDataStateIterator() {
return dataState.iterator();
}

@Override
protected boolean isInRankEnd(long rank) {
return AsyncStateUpdatableTopNFunction.this.isInRankEnd(rank);
}

@Override
protected long getRankEnd() {
return rankEnd;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.table.runtime.operators.rank.utils;

import org.apache.flink.table.data.RowData;

public class RankRow {
public final RowData row;
public int innerRank;
public boolean dirty;

public RankRow(RowData row, int innerRank, boolean dirty) {
this.row = row;
this.innerRank = innerRank;
this.dirty = dirty;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -20,8 +20,11 @@

import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateUpdatableTopNFunction;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;

import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.ArrayList;
import java.util.List;
@@ -32,6 +35,7 @@
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;

/** Tests for {@link UpdatableTopNFunction}. */
@ExtendWith(ParameterizedTestExtension.class)
class UpdatableTopNFunctionTest extends TopNFunctionTestBase {

@Override
@@ -41,22 +45,36 @@ protected AbstractTopNFunction createFunction(
boolean generateUpdateBefore,
boolean outputRankNumber,
boolean enableAsyncState) {
return new UpdatableTopNFunction(
ttlConfig,
inputRowType,
rowKeySelector,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
if (enableAsyncState) {
return new AsyncStateUpdatableTopNFunction(
ttlConfig,
inputRowType,
rowKeySelector,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else {
return new UpdatableTopNFunction(
ttlConfig,
inputRowType,
rowKeySelector,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
}

@Override
boolean supportedAsyncState() {
return false;
return true;
}

@TestTemplate