Skip to content

Commit b0c9652

Browse files
feat(streaming): lower high join amplification logging to > 2048 records, and rate limit at 1 per minute (#16957) (#16964)
Co-authored-by: Noel Kwan <[email protected]>
1 parent 920c50c commit b0c9652

File tree

4 files changed

+36
-5
lines changed

4 files changed

+36
-5
lines changed

src/common/src/config.rs

+9
Original file line numberDiff line numberDiff line change
@@ -933,6 +933,11 @@ pub struct StreamingDeveloperConfig {
933933
/// If true, the arrangement backfill will be disabled,
934934
/// even if session variable set.
935935
pub enable_arrangement_backfill: bool,
936+
937+
#[serde(default = "default::developer::stream_high_join_amplification_threshold")]
938+
/// If number of hash join matches exceeds this threshold number,
939+
/// it will be logged.
940+
pub high_join_amplification_threshold: usize,
936941
}
937942

938943
/// The subsections `[batch.developer]`.
@@ -1647,6 +1652,10 @@ pub mod default {
16471652
pub fn stream_enable_arrangement_backfill() -> bool {
16481653
true
16491654
}
1655+
1656+
pub fn stream_high_join_amplification_threshold() -> usize {
1657+
2048
1658+
}
16501659
}
16511660

16521661
pub use crate::system_param::default as system;

src/config/example.toml

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ stream_memory_controller_threshold_aggressive = 0.9
110110
stream_memory_controller_threshold_graceful = 0.8
111111
stream_memory_controller_threshold_stable = 0.7
112112
stream_enable_arrangement_backfill = true
113+
stream_high_join_amplification_threshold = 2048
113114

114115
[storage]
115116
share_buffers_sync_parallelism = 1

src/stream/src/executor/hash_join.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14-
1514
use std::collections::{BTreeMap, HashSet};
15+
use std::num::NonZeroU32;
1616
use std::sync::LazyLock;
1717
use std::time::Duration;
1818

19+
use governor::{Quota, RateLimiter};
1920
use itertools::Itertools;
2021
use multimap::MultiMap;
2122
use risingwave_common::array::{Op, RowRef};
@@ -159,6 +160,8 @@ pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitiv
159160

160161
/// watermark column index -> `BufferedWatermarks`
161162
watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
163+
164+
high_join_amplification_threshold: usize,
162165
}
163166

164167
impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> std::fmt::Debug
@@ -195,6 +198,7 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
195198
append_only_optimize: bool,
196199
chunk_size: usize,
197200
cnt_rows_received: &'a mut u32,
201+
high_join_amplification_threshold: usize,
198202
}
199203

200204
impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
@@ -218,6 +222,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
218222
is_append_only: bool,
219223
metrics: Arc<StreamingMetrics>,
220224
chunk_size: usize,
225+
high_join_amplification_threshold: usize,
221226
) -> Self {
222227
let side_l_column_n = input_l.schema().len();
223228

@@ -446,6 +451,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
446451
chunk_size,
447452
cnt_rows_received: 0,
448453
watermark_buffers,
454+
high_join_amplification_threshold,
449455
}
450456
}
451457

@@ -539,6 +545,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
539545
append_only_optimize: self.append_only_optimize,
540546
chunk_size: self.chunk_size,
541547
cnt_rows_received: &mut self.cnt_rows_received,
548+
high_join_amplification_threshold: self.high_join_amplification_threshold,
542549
}) {
543550
left_time += left_start_time.elapsed();
544551
yield Message::Chunk(chunk?);
@@ -563,6 +570,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
563570
append_only_optimize: self.append_only_optimize,
564571
chunk_size: self.chunk_size,
565572
cnt_rows_received: &mut self.cnt_rows_received,
573+
high_join_amplification_threshold: self.high_join_amplification_threshold,
566574
}) {
567575
right_time += right_start_time.elapsed();
568576
yield Message::Chunk(chunk?);
@@ -777,6 +785,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
777785
append_only_optimize,
778786
chunk_size,
779787
cnt_rows_received,
788+
high_join_amplification_threshold,
780789
..
781790
} = args;
782791

@@ -832,13 +841,16 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
832841

833842
if let Some(rows) = &matched_rows {
834843
join_matched_join_keys.observe(rows.len() as _);
835-
if rows.len() >= 10000 {
836-
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
837-
LazyLock::new(LogSuppresser::default);
844+
if rows.len() > high_join_amplification_threshold {
845+
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(|| {
846+
LogSuppresser::new(RateLimiter::direct(Quota::per_minute(
847+
NonZeroU32::new(1).unwrap(),
848+
)))
849+
});
838850
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
839851
let join_key_data_types = side_update.ht.join_key_data_types();
840852
let key = key.deserialize(join_key_data_types)?;
841-
tracing::warn!(target: "hash_join_amplification",
853+
tracing::warn!(target: "high_join_amplification",
842854
suppressed_count,
843855
matched_rows_len = rows.len(),
844856
update_table_id = side_update.ht.table_id(),
@@ -1213,6 +1225,7 @@ mod tests {
12131225
false,
12141226
Arc::new(StreamingMetrics::unused()),
12151227
1024,
1228+
2048,
12161229
);
12171230
(tx_l, tx_r, executor.boxed().execute())
12181231
}
@@ -1305,6 +1318,7 @@ mod tests {
13051318
true,
13061319
Arc::new(StreamingMetrics::unused()),
13071320
1024,
1321+
2048,
13081322
);
13091323
(tx_l, tx_r, executor.boxed().execute())
13101324
}

src/stream/src/from_proto/hash_join.rs

+7
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ impl ExecutorBuilder for HashJoinExecutorBuilder {
155155
join_type_proto: node.get_join_type()?,
156156
join_key_data_types,
157157
chunk_size: params.env.config().developer.chunk_size,
158+
high_join_amplification_threshold: params
159+
.env
160+
.config()
161+
.developer
162+
.high_join_amplification_threshold,
158163
};
159164

160165
let exec = args.dispatch()?;
@@ -183,6 +188,7 @@ struct HashJoinExecutorDispatcherArgs<S: StateStore> {
183188
join_type_proto: JoinTypeProto,
184189
join_key_data_types: Vec<DataType>,
185190
chunk_size: usize,
191+
high_join_amplification_threshold: usize,
186192
}
187193

188194
impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
@@ -211,6 +217,7 @@ impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
211217
self.is_append_only,
212218
self.metrics,
213219
self.chunk_size,
220+
self.high_join_amplification_threshold,
214221
)
215222
.boxed())
216223
};

0 commit comments

Comments
 (0)