Skip to content

Commit a2e9711

Browse files
authored
feat(log-store): limit the max size of merged flushed chunk (#22285)
1 parent 247d0ce commit a2e9711

File tree

7 files changed

+56
-15
lines changed

7 files changed

+56
-15
lines changed

src/stream/src/common/log_store_impl/kv_log_store/buffer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ struct LogStoreBufferInner {
5757
consumed_queue: VecDeque<(u64, LogStoreBufferItem)>,
5858
row_count: usize,
5959
max_row_count: usize,
60+
chunk_size: usize,
6061

6162
truncation_list: VecDeque<ReaderTruncationOffsetType>,
6263

@@ -159,14 +160,18 @@ impl LogStoreBufferInner {
159160
end_seq_id: SeqId,
160161
new_vnode_bitmap: Bitmap,
161162
) {
163+
let curr_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
162164
if let Some((
163165
item_epoch,
164166
LogStoreBufferItem::Flushed {
167+
start_seq_id: prev_start_seq_id,
165168
end_seq_id: prev_end_seq_id,
166169
vnode_bitmap,
167170
..
168171
},
169172
)) = self.unconsumed_queue.front_mut()
173+
&& let prev_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
174+
&& curr_chunk_size + prev_chunk_size <= self.chunk_size
170175
{
171176
assert!(
172177
*prev_end_seq_id < start_seq_id,
@@ -436,13 +441,15 @@ impl LogStoreBufferReceiver {
436441

437442
pub(crate) fn new_log_store_buffer(
438443
max_row_count: usize,
444+
chunk_size: usize,
439445
metrics: KvLogStoreMetrics,
440446
) -> (LogStoreBufferSender, LogStoreBufferReceiver) {
441447
let buffer = SharedMutex::new(LogStoreBufferInner {
442448
unconsumed_queue: VecDeque::new(),
443449
consumed_queue: VecDeque::new(),
444450
row_count: 0,
445451
max_row_count,
452+
chunk_size,
446453
truncation_list: VecDeque::new(),
447454
next_chunk_id: 0,
448455
metrics,

src/stream/src/common/log_store_impl/kv_log_store/mod.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,8 @@ pub struct KvLogStoreFactory<S: StateStore> {
488488

489489
vnodes: Option<Arc<Bitmap>>,
490490

491-
max_row_count: usize,
491+
max_buffer_row_count: usize,
492+
chunk_size: usize,
492493

493494
metrics: KvLogStoreMetrics,
494495

@@ -498,11 +499,13 @@ pub struct KvLogStoreFactory<S: StateStore> {
498499
}
499500

500501
impl<S: StateStore> KvLogStoreFactory<S> {
502+
#[expect(clippy::too_many_arguments)]
501503
pub(crate) fn new(
502504
state_store: S,
503505
table_catalog: Table,
504506
vnodes: Option<Arc<Bitmap>>,
505-
max_row_count: usize,
507+
max_buffer_row_count: usize,
508+
chunk_size: usize,
506509
metrics: KvLogStoreMetrics,
507510
identity: impl Into<String>,
508511
pk_info: &'static KvLogStorePkInfo,
@@ -511,7 +514,8 @@ impl<S: StateStore> KvLogStoreFactory<S> {
511514
state_store,
512515
table_catalog,
513516
vnodes,
514-
max_row_count,
517+
max_buffer_row_count,
518+
chunk_size,
515519
metrics,
516520
identity: identity.into(),
517521
pk_info,
@@ -543,9 +547,14 @@ impl<S: StateStore> LogStoreFactory for KvLogStoreFactory<S> {
543547
})
544548
.await;
545549

546-
let (tx, rx) = new_log_store_buffer(self.max_row_count, self.metrics.clone());
550+
let (tx, rx) = new_log_store_buffer(
551+
self.max_buffer_row_count,
552+
self.chunk_size,
553+
self.metrics.clone(),
554+
);
547555

548-
let (read_state, write_state) = new_log_store_state(table_id, local_state_store, serde);
556+
let (read_state, write_state) =
557+
new_log_store_state(table_id, local_state_store, serde, self.chunk_size);
549558

550559
let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
551560
let (update_vnode_bitmap_tx, update_vnode_bitmap_rx) = unbounded_channel();
@@ -633,6 +642,7 @@ mod tests {
633642
table.clone(),
634643
Some(Arc::new(bitmap)),
635644
max_row_count,
645+
1024,
636646
KvLogStoreMetrics::for_test(),
637647
"test",
638648
pk_info,
@@ -750,6 +760,7 @@ mod tests {
750760
table.clone(),
751761
Some(bitmap.clone()),
752762
max_row_count,
763+
1024,
753764
KvLogStoreMetrics::for_test(),
754765
"test",
755766
pk_info,
@@ -845,6 +856,7 @@ mod tests {
845856
table.clone(),
846857
Some(bitmap),
847858
max_row_count,
859+
1024,
848860
KvLogStoreMetrics::for_test(),
849861
"test",
850862
pk_info,
@@ -940,6 +952,7 @@ mod tests {
940952
table.clone(),
941953
Some(bitmap.clone()),
942954
max_row_count,
955+
1024,
943956
KvLogStoreMetrics::for_test(),
944957
"test",
945958
pk_info,
@@ -1061,6 +1074,7 @@ mod tests {
10611074
table.clone(),
10621075
Some(bitmap),
10631076
max_row_count,
1077+
1024,
10641078
KvLogStoreMetrics::for_test(),
10651079
"test",
10661080
pk_info,
@@ -1159,6 +1173,7 @@ mod tests {
11591173
table.clone(),
11601174
Some(vnodes1),
11611175
10 * TEST_DATA_SIZE,
1176+
1024,
11621177
KvLogStoreMetrics::for_test(),
11631178
"test",
11641179
pk_info,
@@ -1168,6 +1183,7 @@ mod tests {
11681183
table.clone(),
11691184
Some(vnodes2),
11701185
10 * TEST_DATA_SIZE,
1186+
1024,
11711187
KvLogStoreMetrics::for_test(),
11721188
"test",
11731189
pk_info,
@@ -1305,6 +1321,7 @@ mod tests {
13051321
table.clone(),
13061322
Some(vnodes),
13071323
10 * TEST_DATA_SIZE,
1324+
1024,
13081325
KvLogStoreMetrics::for_test(),
13091326
"test",
13101327
pk_info,
@@ -1376,6 +1393,7 @@ mod tests {
13761393
table.clone(),
13771394
Some(Arc::new(bitmap)),
13781395
0,
1396+
1024,
13791397
KvLogStoreMetrics::for_test(),
13801398
"test",
13811399
pk_info,
@@ -1524,6 +1542,7 @@ mod tests {
15241542
table.clone(),
15251543
Some(bitmap.clone()),
15261544
1024,
1545+
1024,
15271546
KvLogStoreMetrics::for_test(),
15281547
"test",
15291548
pk_info,
@@ -1642,6 +1661,7 @@ mod tests {
16421661
table.clone(),
16431662
Some(bitmap.clone()),
16441663
1024,
1664+
1024,
16451665
KvLogStoreMetrics::for_test(),
16461666
"test",
16471667
pk_info,
@@ -1711,6 +1731,7 @@ mod tests {
17111731
table.clone(),
17121732
Some(bitmap.clone()),
17131733
1024,
1734+
1024,
17141735
KvLogStoreMetrics::for_test(),
17151736
"test",
17161737
pk_info,
@@ -1883,6 +1904,7 @@ mod tests {
18831904
table.clone(),
18841905
Some(bitmap.clone()),
18851906
max_row_count,
1907+
1024,
18861908
KvLogStoreMetrics::for_test(),
18871909
"test",
18881910
pk_info,
@@ -1970,6 +1992,7 @@ mod tests {
19701992
table.clone(),
19711993
Some(bitmap.clone()),
19721994
max_row_count,
1995+
1024,
19731996
KvLogStoreMetrics::for_test(),
19741997
"test",
19751998
pk_info,
@@ -2042,6 +2065,7 @@ mod tests {
20422065
table.clone(),
20432066
Some(bitmap),
20442067
max_row_count,
2068+
1024,
20452069
KvLogStoreMetrics::for_test(),
20462070
"test",
20472071
pk_info,

src/stream/src/common/log_store_impl/kv_log_store/reader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,12 +765,13 @@ impl<S: StateStoreRead> LogStoreReadState<S> {
765765
}
766766
}));
767767

768+
let chunk_size = self.chunk_size;
768769
streams_future.map_err(Into::into).map_ok(move |streams| {
769770
// TODO: set chunk size by config
770771
Box::pin(merge_log_store_item_stream(
771772
streams,
772773
serde,
773-
1024,
774+
chunk_size,
774775
read_metrics,
775776
))
776777
})

src/stream/src/common/log_store_impl/kv_log_store/state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub(crate) struct LogStoreReadState<S: StateStoreRead> {
3636
pub(super) table_id: TableId,
3737
pub(super) state_store: Arc<S>,
3838
pub(super) serde: LogStoreRowSerde,
39+
pub(super) chunk_size: usize,
3940
}
4041

4142
impl<S: StateStoreRead> LogStoreReadState<S> {
@@ -60,6 +61,7 @@ pub(crate) fn new_log_store_state<S: LocalStateStore>(
6061
table_id: TableId,
6162
state_store: S,
6263
serde: LogStoreRowSerde,
64+
chunk_size: usize,
6365
) -> (
6466
LogStoreReadState<S::FlushedSnapshotReader>,
6567
LogStoreWriteState<S>,
@@ -70,6 +72,7 @@ pub(crate) fn new_log_store_state<S: LocalStateStore>(
7072
table_id,
7173
state_store: Arc::new(flushed_reader),
7274
serde: serde.clone(),
75+
chunk_size,
7376
},
7477
LogStoreWriteState {
7578
state_store,
@@ -330,6 +333,7 @@ mod tests {
330333
})
331334
.await,
332335
serde.clone(),
336+
1024,
333337
);
334338
write_state
335339
.init(EpochPair::new_test_epoch(epoch1))

src/stream/src/executor/sync_kv_log_store.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ pub struct SyncedKvLogStoreExecutor<S: StateStore> {
318318
max_buffer_size: usize,
319319

320320
// Max chunk size when reading from logstore / buffer
321-
chunk_size: u32,
321+
chunk_size: usize,
322322

323323
pause_duration_ms: Duration,
324324
}
@@ -332,7 +332,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
332332
serde: LogStoreRowSerde,
333333
state_store: S,
334334
buffer_size: usize,
335-
chunk_size: u32,
335+
chunk_size: usize,
336336
upstream: Executor,
337337
pause_duration_ms: Duration,
338338
) -> Self {
@@ -539,8 +539,12 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
539539
})
540540
.await;
541541

542-
let (mut read_state, mut initial_write_state) =
543-
new_log_store_state(self.table_id, local_state_store, self.serde);
542+
let (mut read_state, mut initial_write_state) = new_log_store_state(
543+
self.table_id,
544+
local_state_store,
545+
self.serde,
546+
self.chunk_size,
547+
);
544548
initial_write_state.init(first_write_epoch).await?;
545549

546550
let mut pause_stream = first_barrier.is_pause_on_startup();
@@ -958,7 +962,7 @@ struct SyncedLogStoreBuffer {
958962
buffer: VecDeque<(u64, LogStoreBufferItem)>,
959963
current_size: usize,
960964
max_size: usize,
961-
max_chunk_size: u32,
965+
max_chunk_size: usize,
962966
next_chunk_id: ChunkId,
963967
metrics: SyncedKvLogStoreMetrics,
964968
flushed_count: usize,
@@ -1005,7 +1009,7 @@ impl SyncedLogStoreBuffer {
10051009
new_vnode_bitmap: Bitmap,
10061010
epoch: u64,
10071011
) {
1008-
let new_chunk_size = end_seq_id - start_seq_id + 1;
1012+
let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
10091013

10101014
if let Some((
10111015
item_epoch,
@@ -1016,9 +1020,9 @@ impl SyncedLogStoreBuffer {
10161020
..
10171021
},
10181022
)) = self.buffer.back_mut()
1019-
&& let flushed_chunk_size = *prev_end_seq_id - *prev_start_seq_id + 1
1023+
&& let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
10201024
&& let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1021-
&& projected_flushed_chunk_size as u32 <= self.max_chunk_size
1025+
&& projected_flushed_chunk_size <= self.max_chunk_size
10221026
{
10231027
assert!(
10241028
*prev_end_seq_id < start_seq_id,

src/stream/src/from_proto/sink.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ impl ExecutorBuilder for SinkExecutorBuilder {
281281
table,
282282
params.vnode_bitmap.clone().map(Arc::new),
283283
65536,
284+
params.env.config().developer.chunk_size,
284285
metrics,
285286
log_store_identity,
286287
pk_info,

src/stream/src/from_proto/sync_log_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl ExecutorBuilder for SyncLogStoreExecutorBuilder {
6161

6262
let pause_duration_ms = node.pause_duration_ms as _;
6363
let buffer_max_size = node.buffer_size as usize;
64-
let chunk_size = actor_context.streaming_config.developer.chunk_size as u32;
64+
let chunk_size = actor_context.streaming_config.developer.chunk_size;
6565

6666
let executor = SyncedKvLogStoreExecutor::new(
6767
actor_context,

0 commit comments

Comments
 (0)