@@ -101,6 +101,7 @@ impl Sink for Parquet {
101101 let timer_tx = self . tx . clone ( ) ;
102102
103103 let _ = smol:: spawn ( async move {
104+ info ! ( "The flush timer has been set up for {interval:?}" ) ;
104105 let mut timer = smol:: Timer :: interval ( interval) ;
105106 while timer. next ( ) . await . is_some ( ) {
106107 debug ! ( "Timer has fired, issuing a flush" ) ;
@@ -129,14 +130,15 @@ impl Sink for Parquet {
129130
130131 loop {
131132 if let Ok ( msg) = self . rx . recv ( ) . await {
132- debug ! ( "Buffering this message for Parquet output: {msg:?}" ) ;
133+ smol :: future :: yield_now ( ) . await ;
133134
134135 match msg {
135136 Message :: Data {
136137 destination,
137138 payload,
138139 } => {
139140 let _span = span ! ( Level :: TRACE , "Parquet sink recv" ) ;
141+ debug ! ( "Buffering this message for Parquet output: {destination}" ) ;
140142
141143 if !buffer. contains_key ( & destination) {
142144 buffer. insert ( destination. clone ( ) , vec ! [ ] ) ;
@@ -155,19 +157,19 @@ impl Sink for Parquet {
155157 queue. extend ( payload. as_bytes ( ) ) ;
156158 queue. extend ( "\n " . as_bytes ( ) ) ;
157159
158- if ( since_last_flush . elapsed ( ) . as_millis ( ) > flush_ms )
159- || ( * bufsize > self . config . buffer )
160- {
161- debug ! (
162- "Reached the threshold to flush bytes for `{}`" ,
163- & destination
164- ) ;
165- let _ = self . tx . send ( Message :: flush ( ) ) . await ;
160+ if let Some ( max_buffer ) = & self . config . buffer {
161+ if ( * bufsize) >= * max_buffer {
162+ debug ! (
163+ "Reached the threshold to flush bytes for `{}`" ,
164+ & destination
165+ ) ;
166+ let _ = self . tx . send ( Message :: flush ( ) ) . await ;
167+ }
166168 }
167169 }
168170 }
169171 Message :: Flush { should_exit } => {
170- info ! ( "Parquet sink has been told to flush" ) ;
172+ debug ! ( "Parquet sink has been told to flush" ) ;
171173
172174 for ( destination, buf) in buffer. drain ( ) {
173175 let _flush_span = span ! ( Level :: INFO , "Parquet flush for" , destination) ;
@@ -294,8 +296,7 @@ pub struct Config {
294296 /// Expected to be an S3 compatible URL
295297 pub url : Url ,
296298 /// Minimum number of bytes to buffer into each parquet file
297- #[ serde( default = "parquet_buffer_default" ) ]
298- pub buffer : usize ,
299+ pub buffer : Option < usize > ,
299300 /// Duration in milliseconds before a flush to storage should happen
300301 #[ serde( default = "parquet_flush_default" ) ]
301302 pub flush_ms : usize ,
@@ -309,14 +310,9 @@ fn parquet_url_default() -> Url {
309310 . expect ( "The S3_OUTPUT_URL could not be parsed as a valid URL" )
310311}
311312
312- /// Default number of log lines per parquet file
313- fn parquet_buffer_default ( ) -> usize {
314- 1_024 * 1_024 * 100
315- }
316-
317313/// Default milliseconds before a Parquet sink flush
318314fn parquet_flush_default ( ) -> usize {
319- 1000 * 10
315+ 1000 * 60
320316}
321317
322318#[ cfg( test) ]
@@ -330,7 +326,6 @@ mod tests {
330326url: 's3://bucket'
331327 "# ;
332328 let parquet: Config = serde_yaml:: from_str ( conf) . expect ( "Failed to deserialize" ) ;
333- assert_eq ! ( parquet. buffer, parquet_buffer_default( ) ) ;
334329 assert_eq ! ( parquet. flush_ms, parquet_flush_default( ) ) ;
335330 assert_eq ! (
336331 parquet. url,
@@ -340,7 +335,6 @@ url: 's3://bucket'
340335
341336 #[ test]
342337 fn test_defaults ( ) {
343- assert ! ( 0 < parquet_buffer_default( ) ) ;
344338 assert ! ( 0 < parquet_flush_default( ) ) ;
345339 }
346340}
0 commit comments