@@ -25,7 +25,6 @@ use anyhow::{bail, Context};
2525use chitchat:: transport:: ChannelTransport ;
2626use chitchat:: FailureDetectorConfig ;
2727use quickwit_actors:: { ActorHandle , Mailbox , Universe } ;
28- use quickwit_cli:: run_index_checklist;
2928use quickwit_cluster:: { Cluster , ClusterMember } ;
3029use quickwit_common:: pubsub:: EventBroker ;
3130use quickwit_common:: runtimes:: RuntimesConfig ;
@@ -34,7 +33,7 @@ use quickwit_config::merge_policy_config::MergePolicyConfig;
3433use quickwit_config:: service:: QuickwitService ;
3534use quickwit_config:: {
3635 load_index_config_from_user_config, ConfigFormat , IndexConfig , NodeConfig , SourceConfig ,
37- SourceInputFormat , SourceParams , TransformConfig , CLI_SOURCE_ID ,
36+ SourceInputFormat , SourceParams , TransformConfig ,
3837} ;
3938use quickwit_index_management:: IndexService ;
4039use quickwit_indexing:: actors:: {
@@ -44,10 +43,13 @@ use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, Spa
4443use quickwit_indexing:: IndexingPipeline ;
4544use quickwit_ingest:: IngesterPool ;
4645use quickwit_janitor:: { start_janitor_service, JanitorService } ;
47- use quickwit_metastore:: CreateIndexRequestExt ;
46+ use quickwit_metastore:: {
47+ CreateIndexRequestExt , CreateIndexResponseExt , IndexMetadata , IndexMetadataResponseExt ,
48+ } ;
4849use quickwit_proto:: indexing:: CpuCapacity ;
4950use quickwit_proto:: metastore:: {
50- CreateIndexRequest , MetastoreError , MetastoreService , MetastoreServiceClient ,
51+ CreateIndexRequest , IndexMetadataRequest , MetastoreError , MetastoreService ,
52+ MetastoreServiceClient , ResetSourceCheckpointRequest ,
5153} ;
5254use quickwit_proto:: types:: { NodeId , PipelineUid } ;
5355use quickwit_search:: SearchJobPlacer ;
@@ -56,7 +58,11 @@ use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, Teleme
5658use tracing:: { debug, info, instrument} ;
5759
5860use crate :: environment:: INDEX_ID ;
59- use crate :: indexer:: environment:: { DISABLE_JANITOR , DISABLE_MERGE , INDEX_CONFIG_URI } ;
61+ use crate :: indexer:: environment:: {
62+ DISABLE_JANITOR , DISABLE_MERGE , INDEX_CONFIG_URI , MAX_CHECKPOINTS ,
63+ } ;
64+
65+ const LAMBDA_SOURCE_ID : & str = "_ingest-lambda-source" ;
6066
6167/// The indexing service needs to update its cluster chitchat state so that the control plane is
6268/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
@@ -131,66 +137,71 @@ pub(super) async fn send_telemetry() {
131137 quickwit_telemetry:: send_telemetry_event ( TelemetryEvent :: RunCommand ) . await ;
132138}
133139
134- pub ( super ) fn configure_source (
140+ /// Convert the incomming file path to a source config
141+ pub ( super ) async fn configure_source (
135142 input_path : PathBuf ,
136143 input_format : SourceInputFormat ,
137144 vrl_script : Option < String > ,
138- ) -> SourceConfig {
139- let source_params = SourceParams :: file ( input_path) ;
145+ ) -> anyhow:: Result < SourceConfig > {
140146 let transform_config = vrl_script. map ( |vrl_script| TransformConfig :: new ( vrl_script, None ) ) ;
141- SourceConfig {
142- source_id : CLI_SOURCE_ID . to_string ( ) ,
147+ let source_params = SourceParams :: file ( input_path. clone ( ) ) ;
148+ Ok ( SourceConfig {
149+ source_id : LAMBDA_SOURCE_ID . to_owned ( ) ,
143150 num_pipelines : NonZeroUsize :: new ( 1 ) . expect ( "1 is always non-zero." ) ,
144151 enabled : true ,
145152 source_params,
146153 transform_config,
147154 input_format,
148- }
155+ } )
149156}
150157
151158/// Check if the index exists, creating or overwriting it if necessary
152159pub ( super ) async fn init_index_if_necessary (
153160 metastore : & mut MetastoreServiceClient ,
154161 storage_resolver : & StorageResolver ,
155- source_config : & SourceConfig ,
156162 default_index_root_uri : & Uri ,
157163 overwrite : bool ,
158- ) -> anyhow:: Result < ( ) > {
159- let checklist_result =
160- run_index_checklist ( metastore, storage_resolver, & INDEX_ID , Some ( source_config) ) . await ;
161- if let Err ( e) = checklist_result {
162- let is_not_found = e
163- . downcast_ref ( )
164- . is_some_and ( |meta_error| matches ! ( meta_error, MetastoreError :: NotFound ( _) ) ) ;
165- if !is_not_found {
166- bail ! ( e) ;
164+ ) -> anyhow:: Result < IndexMetadata > {
165+ let metadata_result = metastore
166+ . index_metadata ( IndexMetadataRequest :: for_index_id ( INDEX_ID . clone ( ) ) )
167+ . await ;
168+ let metadata = match metadata_result {
169+ Ok ( _) if overwrite => {
170+ info ! (
171+ index_id = * INDEX_ID ,
172+ "Overwrite enabled, clearing existing index" ,
173+ ) ;
174+ let mut index_service = IndexService :: new ( metastore. clone ( ) , storage_resolver. clone ( ) ) ;
175+ index_service. clear_index ( & INDEX_ID ) . await ?;
176+ metastore
177+ . index_metadata ( IndexMetadataRequest :: for_index_id ( INDEX_ID . clone ( ) ) )
178+ . await ?
179+ . deserialize_index_metadata ( ) ?
167180 }
168- info ! (
169- index_id = * INDEX_ID ,
170- index_config_uri = * INDEX_CONFIG_URI ,
171- "Index not found, creating it"
172- ) ;
173- let index_config = load_index_config ( storage_resolver, default_index_root_uri) . await ?;
174- if index_config. index_id != * INDEX_ID {
175- bail ! (
176- "Expected index ID was {} but config file had {}" ,
177- * INDEX_ID ,
178- index_config. index_id,
181+ Ok ( metadata_resp) => metadata_resp. deserialize_index_metadata ( ) ?,
182+ Err ( MetastoreError :: NotFound ( _) ) => {
183+ info ! (
184+ index_id = * INDEX_ID ,
185+ index_config_uri = * INDEX_CONFIG_URI ,
186+ "Index not found, creating it"
179187 ) ;
188+ let index_config = load_index_config ( storage_resolver, default_index_root_uri) . await ?;
189+ if index_config. index_id != * INDEX_ID {
190+ bail ! (
191+ "Expected index ID was {} but config file had {}" ,
192+ * INDEX_ID ,
193+ index_config. index_id,
194+ ) ;
195+ }
196+ let create_resp = metastore
197+ . create_index ( CreateIndexRequest :: try_from_index_config ( & index_config) ?)
198+ . await ?;
199+ info ! ( "index created" ) ;
200+ create_resp. deserialize_index_metadata ( ) ?
180201 }
181- metastore
182- . create_index ( CreateIndexRequest :: try_from_index_config ( & index_config) ?)
183- . await ?;
184- info ! ( "index created" ) ;
185- } else if overwrite {
186- info ! (
187- index_id = * INDEX_ID ,
188- "Overwrite enabled, clearing existing index" ,
189- ) ;
190- let mut index_service = IndexService :: new ( metastore. clone ( ) , storage_resolver. clone ( ) ) ;
191- index_service. clear_index ( & INDEX_ID ) . await ?;
192- }
193- Ok ( ( ) )
202+ Err ( e) => bail ! ( e) ,
203+ } ;
204+ Ok ( metadata)
194205}
195206
196207pub ( super ) async fn spawn_services (
@@ -249,6 +260,7 @@ pub(super) async fn spawn_services(
249260 Ok ( ( indexing_service_handle, janitor_service_opt) )
250261}
251262
263+ /// Spawn and split an indexing pipeline
252264pub ( super ) async fn spawn_pipelines (
253265 indexing_server_mailbox : & Mailbox < IndexingService > ,
254266 source_config : SourceConfig ,
@@ -271,6 +283,43 @@ pub(super) async fn spawn_pipelines(
271283 Ok ( ( indexing_pipeline_handle, merge_pipeline_handle) )
272284}
273285
286+ /// Prune old Lambda file checkpoints if there are too many
287+ ///
288+ /// Without pruning checkpoints accumulate indifinitely. This is particularly
289+ /// problematic when indexing a lot of small files, as the metastore will grow
290+ /// large even for a small index.
291+ ///
292+ /// The current implementation just deletes all checkpoints if there are more
293+ /// than QW_LAMBDA_MAX_CHECKPOINTS. When this purging is performed, the Lambda
294+ /// indexer might ingest the same file again if it receives a duplicate
295+ /// notification.
296+ pub ( super ) async fn prune_lambda_source (
297+ metastore : & mut MetastoreServiceClient ,
298+ index_metadata : IndexMetadata ,
299+ ) -> anyhow:: Result < ( ) > {
300+ let lambda_checkpoint_opt = index_metadata
301+ . checkpoint
302+ . source_checkpoint ( LAMBDA_SOURCE_ID ) ;
303+
304+ if let Some ( lambda_checkpoint) = lambda_checkpoint_opt {
305+ if lambda_checkpoint. num_partitions ( ) > * MAX_CHECKPOINTS {
306+ info ! (
307+ partitions = lambda_checkpoint. num_partitions( ) ,
308+ "prune Lambda checkpoints"
309+ ) ;
310+ metastore
311+ . reset_source_checkpoint ( ResetSourceCheckpointRequest {
312+ index_uid : Some ( index_metadata. index_uid . clone ( ) ) ,
313+ source_id : LAMBDA_SOURCE_ID . to_owned ( ) ,
314+ } )
315+ . await ?;
316+ }
317+ }
318+
319+ Ok ( ( ) )
320+ }
321+
322+ /// Observe the merge pipeline until there are no more ongoing merges
274323pub ( super ) async fn wait_for_merges (
275324 merge_pipeline_handle : ActorHandle < MergePipeline > ,
276325) -> anyhow:: Result < ( ) > {
0 commit comments