Skip to content

Commit bfe3576

Browse files
authored
[FLINK-37025] Fix generating watermarks in SQL on-periodic (#25921) (#25933)
1 parent 28fedea commit bfe3576

File tree

2 files changed

+112
-2
lines changed

2 files changed

+112
-2
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.runtime.stream.sql;
20+
21+
import org.apache.flink.configuration.CoreOptions;
22+
import org.apache.flink.table.data.TimestampData;
23+
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
24+
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
25+
import org.apache.flink.types.Row;
26+
import org.apache.flink.util.CollectionUtil;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.time.LocalDateTime;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
import java.util.stream.Collectors;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
/** IT tests for verifying watermarks behaviour. */
38+
class WatermarkITCase extends StreamingTestBase {
39+
40+
@Test
41+
void testWatermarkNotMovingBack() {
42+
List<Row> data =
43+
Arrays.asList(
44+
Row.of(1, LocalDateTime.parse("2024-01-01T00:00:00")),
45+
Row.of(3, LocalDateTime.parse("2024-01-03T00:00:00")),
46+
Row.of(2, LocalDateTime.parse("2024-01-02T00:00:00")));
47+
48+
String dataId = TestValuesTableFactory.registerData(data);
49+
50+
final String ddl =
51+
String.format(
52+
"CREATE Table VirtualTable (\n"
53+
+ " a INT,\n"
54+
+ " c TIMESTAMP(3),\n"
55+
+ " WATERMARK FOR c as c\n"
56+
+ ") with (\n"
57+
+ " 'connector' = 'values',\n"
58+
+ " 'bounded' = 'false',\n"
59+
+ " 'scan.watermark.emit.strategy' = 'on-periodic',\n"
60+
+ " 'enable-watermark-push-down' = 'true',\n"
61+
+ " 'disable-lookup' = 'true',\n"
62+
+ " 'data-id' = '%s'\n"
63+
+ ")\n",
64+
dataId);
65+
66+
tEnv().executeSql(ddl);
67+
tEnv().getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
68+
String query = "SELECT a, c, current_watermark(c) FROM VirtualTable order by c";
69+
70+
final List<Row> result = CollectionUtil.iteratorToList(tEnv().executeSql(query).collect());
71+
final List<String> actualWatermarks =
72+
TestValuesTableFactory.getWatermarkOutput("VirtualTable").stream()
73+
.map(
74+
x ->
75+
TimestampData.fromEpochMillis(x.getTimestamp())
76+
.toLocalDateTime()
77+
.toString())
78+
.collect(Collectors.toList());
79+
80+
// Underneath, we use FromElementSourceFunctionWithWatermark which is a SourceFunction.
81+
// SourceFunction does not support watermark moving back. SourceStreamTask does not support
82+
// WatermarkGenerator natively. The test implementation calls
83+
// WatermarkGenerator#onPeriodicEmit
84+
// after each record, which makes the test deterministic.
85+
// Additionally, the GeneratedWatermarkGeneratorSupplier does not deduplicate already
86+
// emitted
87+
// watermarks. This is usually handled by the target WatermarkOutput. In this test, we do
88+
// not deduplicate watermarks because we use TestValuesWatermarkOutput.
89+
// Given the fact watermarks are generated after every record and we don't deduplicate them,
90+
// we have "2024-01-03T00:00" twice in the expected watermarks.
91+
assertThat(actualWatermarks)
92+
.containsExactly("2024-01-01T00:00", "2024-01-03T00:00", "2024-01-03T00:00");
93+
assertThat(result)
94+
.containsExactly(
95+
Row.of(
96+
1,
97+
LocalDateTime.parse("2024-01-01T00:00"),
98+
LocalDateTime.parse("2024-01-01T00:00")),
99+
Row.of(
100+
2,
101+
LocalDateTime.parse("2024-01-02T00:00"),
102+
LocalDateTime.parse("2024-01-03T00:00")),
103+
Row.of(
104+
3,
105+
LocalDateTime.parse("2024-01-03T00:00"),
106+
LocalDateTime.parse("2024-01-03T00:00")));
107+
}
108+
}

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import javax.annotation.Nullable;
3131

32+
import java.io.Serializable;
3233
import java.util.ArrayList;
3334
import java.util.Arrays;
3435
import java.util.List;
@@ -84,7 +85,8 @@ public GeneratedWatermarkGeneratorSupplier(
8485

8586
/** Wrapper of the code-generated {@link WatermarkGenerator}. */
8687
public static class DefaultWatermarkGenerator
87-
implements org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> {
88+
implements org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>,
89+
Serializable {
8890
private static final long serialVersionUID = 1L;
8991

9092
private final WatermarkGenerator innerWatermarkGenerator;
@@ -102,7 +104,7 @@ public DefaultWatermarkGenerator(
102104
public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) {
103105
try {
104106
Long watermark = innerWatermarkGenerator.currentWatermark(event);
105-
if (watermark != null) {
107+
if (watermark != null && watermark > currentWatermark) {
106108
currentWatermark = watermark;
107109
if (watermarkEmitStrategy.isOnEvent()) {
108110
output.emitWatermark(new Watermark(currentWatermark));

0 commit comments

Comments
 (0)