@@ -136,7 +136,7 @@ impl Sink for Parquet {
136136 destination,
137137 payload,
138138 } => {
139- let _span = span ! ( Level :: DEBUG , "Parquet sink recv" ) ;
139+ let _span = span ! ( Level :: TRACE , "Parquet sink recv" ) ;
140140
141141 if !buffer. contains_key ( & destination) {
142142 buffer. insert ( destination. clone ( ) , vec ! [ ] ) ;
@@ -178,7 +178,8 @@ impl Sink for Parquet {
178178 schema. clone ( ) ,
179179 & destination,
180180 & buf,
181- ) ;
181+ )
182+ . await ;
182183 } else {
183184 info ! (
184185 "Did not have a schema, so will try inferring one for {destination}!"
@@ -204,7 +205,8 @@ impl Sink for Parquet {
204205 Arc :: new ( inferred_schema) ,
205206 & destination,
206207 & buf,
207- ) ;
208+ )
209+ . await ;
208210 }
209211 Err ( err) => {
210212 error ! (
@@ -231,7 +233,7 @@ impl Sink for Parquet {
231233}
232234
233235/// Write the given buffer to a new `.parquet` file in the [ObjectStore]
234- fn flush_to_parquet (
236+ async fn flush_to_parquet (
235237 store : ObjectStoreRef ,
236238 schema : Arc < arrow_schema:: Schema > ,
237239 destination : & str ,
@@ -250,26 +252,39 @@ fn flush_to_parquet(
250252 let mut decoder = ReaderBuilder :: new ( schema. clone ( ) )
251253 . build_decoder ( )
252254 . expect ( "Failed to build the JSON decoder" ) ;
253- let decoded = decoder. decode ( buffer) . expect ( "Failed to deserialize bytes" ) ;
254- debug ! ( "Decoded {decoded} bytes" ) ;
255-
256255 let output =
257256 object_store:: path:: Path :: from ( format ! ( "{destination}/{}.parquet" , Uuid :: new_v4( ) ) ) ;
258-
259257 let object_writer = ParquetObjectWriter :: new ( store. clone ( ) , output. clone ( ) ) ;
260258 let mut writer = AsyncArrowWriter :: try_new ( object_writer, schema. clone ( ) , None )
261259 . expect ( "Failed to build AsyncArrowWriter" ) ;
262260
263- smol:: block_on ( Compat :: new ( async {
264- if let Some ( batch) = decoder
265- . flush ( )
266- . expect ( "Failed to flush bytes to a RecordBatch" )
267- {
268- writer. write ( & batch) . await . expect ( "Failed to write a batch" ) ;
261+ let total_bytes = buffer. len ( ) ;
262+ let mut read_bytes = 0 ;
263+
264+ Compat :: new ( async {
265+ loop {
266+ let decoded = decoder
267+ . decode ( & buffer[ read_bytes..] )
268+ . expect ( "Failed to deserialize bytes" ) ;
269+ debug ! ( "Decoded {decoded} bytes" ) ;
270+ read_bytes += decoded;
271+
272+ if let Some ( batch) = decoder
273+ . flush ( )
274+ . expect ( "Failed to flush bytes to a RecordBatch" )
275+ {
276+ debug ! ( "Wrote a batch" ) ;
277+ writer. write ( & batch) . await . expect ( "Failed to write a batch" ) ;
278+ }
279+
280+ if read_bytes >= total_bytes {
281+ break ;
282+ }
269283 }
270284 let file_result = writer. close ( ) . await . expect ( "Failed to close the writer" ) ;
271285 info ! ( "Flushed {} rows to storage" , file_result. num_rows) ;
272- } ) ) ;
286+ } )
287+ . await ;
273288}
274289
275290/// Configuration for [Parquet] sink
0 commit comments