File tree 1 file changed +19
-7
lines changed
1 file changed +19
-7
lines changed Original file line number Diff line number Diff line change @@ -339,20 +339,32 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
339
339
} else {
340
340
chunks
341
341
} ;
342
- let chunks = if re_construct_with_sink_pk {
343
- StreamChunkCompactor :: new ( down_stream_pk. clone ( ) , chunks)
342
+ if re_construct_with_sink_pk {
343
+ let chunks = StreamChunkCompactor :: new ( down_stream_pk. clone ( ) , chunks)
344
344
. reconstructed_compacted_chunks (
345
345
chunk_size,
346
346
input_data_types. clone ( ) ,
347
347
sink_type != SinkType :: ForceAppendOnly ,
348
- )
348
+ ) ;
349
+ for c in chunks {
350
+ yield Message :: Chunk ( c) ;
351
+ }
349
352
} else {
350
- chunks
353
+ let mut chunk_builder =
354
+ StreamChunkBuilder :: new ( chunk_size, input_data_types. clone ( ) ) ;
355
+ for chunk in chunks {
356
+ for ( op, row) in chunk. rows ( ) {
357
+ if let Some ( c) = chunk_builder. append_row ( op, row) {
358
+ yield Message :: Chunk ( c) ;
359
+ }
360
+ }
361
+ }
362
+
363
+ if let Some ( c) = chunk_builder. take ( ) {
364
+ yield Message :: Chunk ( c) ;
365
+ }
351
366
} ;
352
367
353
- for c in chunks {
354
- yield Message :: Chunk ( c) ;
355
- }
356
368
if let Some ( w) = mem:: take ( & mut watermark) {
357
369
yield Message :: Watermark ( w)
358
370
}
You can’t perform that action at this time.
0 commit comments