Skip to content

Commit c91513a

Browse files
st1pageyuhao-su
andauthored
fix(watermark): align watermark for idle input #11554 to v1.1 (#11605)
Co-authored-by: Yuhao Su <[email protected]>
1 parent 924cc8e commit c91513a

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

src/stream/src/executor/watermark_filter.rs

+19
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
130130
current_watermark.clone(),
131131
));
132132

133+
// If the input is idle
134+
let mut idle_input = true;
135+
133136
#[for_await]
134137
for msg in input {
135138
let msg = msg?;
@@ -180,6 +183,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
180183
yield Message::Chunk(output_chunk);
181184
};
182185

186+
idle_input = false;
183187
yield Message::Watermark(Watermark::new(
184188
event_time_col_idx,
185189
watermark_type.clone(),
@@ -192,6 +196,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
192196
let watermark = watermark.val;
193197
if current_watermark.default_cmp(&watermark).is_lt() {
194198
current_watermark = watermark;
199+
idle_input = false;
195200
yield Message::Watermark(Watermark::new(
196201
event_time_col_idx,
197202
watermark_type.clone(),
@@ -229,6 +234,20 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
229234
table.insert(row);
230235
}
231236
table.commit(barrier.epoch).await?;
237+
238+
if idle_input {
239+
// Align watermark
240+
let global_max_watermark =
241+
Self::get_global_max_watermark(&table, watermark_type.clone())
242+
.await?;
243+
current_watermark = cmp::max_by(
244+
current_watermark,
245+
global_max_watermark,
246+
DefaultOrd::default_cmp,
247+
);
248+
} else {
249+
idle_input = true;
250+
}
232251
} else {
233252
table.commit_no_data_expected(barrier.epoch);
234253
}

0 commit comments

Comments
 (0)