1818// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919
2020use std:: borrow:: Cow ;
21- use std:: collections:: VecDeque ;
2221use std:: fmt:: Display ;
23- use std:: io:: { stdout, Stdout , Write } ;
2422use std:: num:: NonZeroUsize ;
2523use std:: ops:: Div ;
2624use std:: path:: PathBuf ;
2725use std:: str:: FromStr ;
2826use std:: time:: { Duration , Instant } ;
29- use std:: { fmt, io} ;
3027
3128use anyhow:: { anyhow, bail, Context } ;
3229use bytesize:: ByteSize ;
3330use clap:: { arg, Arg , ArgAction , ArgMatches , Command } ;
34- use colored:: { ColoredString , Colorize } ;
35- use humantime:: format_duration;
31+ use colored:: Colorize ;
3632use indicatif:: { ProgressBar , ProgressStyle } ;
3733use itertools:: Itertools ;
3834use numfmt:: { Formatter , Scales } ;
39- use quickwit_actors:: ActorHandle ;
4035use quickwit_common:: tower:: { Rate , RateEstimator , SmaRateEstimator } ;
4136use quickwit_common:: uri:: Uri ;
4237use quickwit_config:: { ConfigFormat , IndexConfig } ;
43- use quickwit_indexing:: models:: IndexingStatistics ;
44- use quickwit_indexing:: IndexingPipeline ;
4538use quickwit_metastore:: { IndexMetadata , Split , SplitState } ;
4639use quickwit_proto:: search:: { CountHits , SortField , SortOrder } ;
4740use quickwit_proto:: types:: IndexId ;
@@ -54,12 +47,11 @@ use tabled::settings::object::{FirstRow, Rows, Segment};
5447use tabled:: settings:: panel:: Footer ;
5548use tabled:: settings:: { Alignment , Disable , Format , Modify , Panel , Rotate , Style } ;
5649use tabled:: { Table , Tabled } ;
57- use thousands:: Separable ;
5850use tracing:: { debug, Level } ;
5951
6052use crate :: checklist:: GREEN_COLOR ;
6153use crate :: stats:: { mean, percentile, std_deviation} ;
62- use crate :: { client_args, make_table, prompt_confirmation, ClientArgs , THROUGHPUT_WINDOW_SIZE } ;
54+ use crate :: { client_args, make_table, prompt_confirmation, ClientArgs } ;
6355
6456pub fn build_index_command ( ) -> Command {
6557 Command :: new ( "index" )
@@ -1143,187 +1135,6 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> {
11431135 Ok ( ( ) )
11441136}
11451137
1146- /// Starts a tokio task that displays the indexing statistics
1147- /// every once in awhile.
1148- pub async fn start_statistics_reporting_loop (
1149- pipeline_handle : ActorHandle < IndexingPipeline > ,
1150- is_stdin : bool ,
1151- ) -> anyhow:: Result < IndexingStatistics > {
1152- let mut stdout_handle = stdout ( ) ;
1153- let start_time = Instant :: now ( ) ;
1154- let mut throughput_calculator = ThroughputCalculator :: new ( start_time) ;
1155- let mut report_interval = tokio:: time:: interval ( Duration :: from_secs ( 1 ) ) ;
1156-
1157- loop {
1158- // TODO fixme. The way we wait today is a bit lame: if the indexing pipeline exits, we will
1159- // still wait up to an entire heartbeat... Ideally we should select between two
1160- // futures.
1161- report_interval. tick ( ) . await ;
1162- // Try to receive with a timeout of 1 second.
1163- // 1 second is also the frequency at which we update statistic in the console
1164- pipeline_handle. refresh_observe ( ) ;
1165-
1166- let observation = pipeline_handle. last_observation ( ) ;
1167-
1168- // Let's not display live statistics to allow screen to scroll.
1169- if observation. num_docs > 0 {
1170- display_statistics ( & mut stdout_handle, & mut throughput_calculator, & observation) ?;
1171- }
1172-
1173- if pipeline_handle. state ( ) . is_exit ( ) {
1174- break ;
1175- }
1176- }
1177- let ( pipeline_exit_status, pipeline_statistics) = pipeline_handle. join ( ) . await ;
1178- if !pipeline_exit_status. is_success ( ) {
1179- bail ! ( pipeline_exit_status) ;
1180- }
1181- // If we have received zero docs at this point,
1182- // there is no point in displaying report.
1183- if pipeline_statistics. num_docs == 0 {
1184- return Ok ( pipeline_statistics) ;
1185- }
1186-
1187- if is_stdin {
1188- display_statistics (
1189- & mut stdout_handle,
1190- & mut throughput_calculator,
1191- & pipeline_statistics,
1192- ) ?;
1193- }
1194- // display end of task report
1195- println ! ( ) ;
1196- let secs = Duration :: from_secs ( start_time. elapsed ( ) . as_secs ( ) ) ;
1197- if pipeline_statistics. num_invalid_docs == 0 {
1198- println ! (
1199- "Indexed {} documents in {}." ,
1200- pipeline_statistics. num_docs. separate_with_commas( ) ,
1201- format_duration( secs)
1202- ) ;
1203- } else {
1204- let num_indexed_docs = ( pipeline_statistics. num_docs
1205- - pipeline_statistics. num_invalid_docs )
1206- . separate_with_commas ( ) ;
1207-
1208- let error_rate = ( pipeline_statistics. num_invalid_docs as f64
1209- / pipeline_statistics. num_docs as f64 )
1210- * 100.0 ;
1211-
1212- println ! (
1213- "Indexed {} out of {} documents in {}. Failed to index {} document(s). {}\n " ,
1214- num_indexed_docs,
1215- pipeline_statistics. num_docs. separate_with_commas( ) ,
1216- format_duration( secs) ,
1217- pipeline_statistics. num_invalid_docs. separate_with_commas( ) ,
1218- colorize_error_rate( error_rate) ,
1219- ) ;
1220- }
1221-
1222- Ok ( pipeline_statistics)
1223- }
1224-
1225- fn colorize_error_rate ( error_rate : f64 ) -> ColoredString {
1226- let error_rate_message = format ! ( "({error_rate:.1}% error rate)" ) ;
1227- if error_rate < 1.0 {
1228- error_rate_message. yellow ( )
1229- } else if error_rate < 5.0 {
1230- error_rate_message. truecolor ( 255 , 181 , 46 ) //< Orange
1231- } else {
1232- error_rate_message. red ( )
1233- }
1234- }
1235-
1236- /// A struct to print data on the standard output.
1237- struct Printer < ' a > {
1238- pub stdout : & ' a mut Stdout ,
1239- }
1240-
1241- impl Printer < ' _ > {
1242- pub fn print_header ( & mut self , header : & str ) -> io:: Result < ( ) > {
1243- write ! ( & mut self . stdout, " {}" , header. bright_blue( ) ) ?;
1244- Ok ( ( ) )
1245- }
1246-
1247- pub fn print_value ( & mut self , fmt_args : fmt:: Arguments ) -> io:: Result < ( ) > {
1248- write ! ( & mut self . stdout, " {fmt_args}" )
1249- }
1250-
1251- pub fn flush ( & mut self ) -> io:: Result < ( ) > {
1252- self . stdout . flush ( )
1253- }
1254- }
1255-
1256- fn display_statistics (
1257- stdout : & mut Stdout ,
1258- throughput_calculator : & mut ThroughputCalculator ,
1259- statistics : & IndexingStatistics ,
1260- ) -> anyhow:: Result < ( ) > {
1261- let elapsed_duration = time:: Duration :: try_from ( throughput_calculator. elapsed_time ( ) ) ?;
1262- let elapsed_time = format ! (
1263- "{:02}:{:02}:{:02}" ,
1264- elapsed_duration. whole_hours( ) ,
1265- elapsed_duration. whole_minutes( ) % 60 ,
1266- elapsed_duration. whole_seconds( ) % 60
1267- ) ;
1268- let throughput_mb_s = throughput_calculator. calculate ( statistics. total_bytes_processed ) ;
1269- let mut printer = Printer { stdout } ;
1270- printer. print_header ( "Num docs" ) ?;
1271- printer. print_value ( format_args ! ( "{:>7}" , statistics. num_docs) ) ?;
1272- printer. print_header ( "Parse errs" ) ?;
1273- printer. print_value ( format_args ! ( "{:>5}" , statistics. num_invalid_docs) ) ?;
1274- printer. print_header ( "PublSplits" ) ?;
1275- printer. print_value ( format_args ! ( "{:>3}" , statistics. num_published_splits) ) ?;
1276- printer. print_header ( "Input size" ) ?;
1277- printer. print_value ( format_args ! (
1278- "{:>5}MB" ,
1279- statistics. total_bytes_processed / 1_000_000
1280- ) ) ?;
1281- printer. print_header ( "Thrghput" ) ?;
1282- printer. print_value ( format_args ! ( "{throughput_mb_s:>5.2}MB/s" ) ) ?;
1283- printer. print_header ( "Time" ) ?;
1284- printer. print_value ( format_args ! ( "{elapsed_time}\n " ) ) ?;
1285- printer. flush ( ) ?;
1286- Ok ( ( ) )
1287- }
1288-
1289- /// ThroughputCalculator is used to calculate throughput.
1290- struct ThroughputCalculator {
1291- /// Stores the time series of processed bytes value.
1292- processed_bytes_values : VecDeque < ( Instant , u64 ) > ,
1293- /// Store the time this calculator started
1294- start_time : Instant ,
1295- }
1296-
1297- impl ThroughputCalculator {
1298- /// Creates new instance.
1299- pub fn new ( start_time : Instant ) -> Self {
1300- let processed_bytes_values: VecDeque < ( Instant , u64 ) > = ( 0 ..THROUGHPUT_WINDOW_SIZE )
1301- . map ( |_| ( start_time, 0u64 ) )
1302- . collect ( ) ;
1303- Self {
1304- processed_bytes_values,
1305- start_time,
1306- }
1307- }
1308-
1309- /// Calculates the throughput.
1310- pub fn calculate ( & mut self , current_processed_bytes : u64 ) -> f64 {
1311- self . processed_bytes_values . pop_front ( ) ;
1312- let current_instant = Instant :: now ( ) ;
1313- let ( first_instant, first_processed_bytes) = * self . processed_bytes_values . front ( ) . unwrap ( ) ;
1314- let elapsed_time = ( current_instant - first_instant) . as_millis ( ) as f64 / 1_000f64 ;
1315- self . processed_bytes_values
1316- . push_back ( ( current_instant, current_processed_bytes) ) ;
1317- ( current_processed_bytes - first_processed_bytes) as f64
1318- / 1_000_000f64
1319- / elapsed_time. max ( 1f64 )
1320- }
1321-
1322- pub fn elapsed_time ( & self ) -> Duration {
1323- self . start_time . elapsed ( )
1324- }
1325- }
1326-
13271138#[ cfg( test) ]
13281139mod test {
13291140
0 commit comments