Skip to content

Commit dcbe67a

Browse files
authored
Move garbage collector logic to index-management (#3708)
1 parent 06ac446 commit dcbe67a

File tree

8 files changed

+48
-51
lines changed

8 files changed

+48
-51
lines changed

quickwit/Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-index-management/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ async-trait = { workspace = true }
1515
byte-unit = { workspace = true }
1616
futures = { workspace = true }
1717
futures-util = { workspace = true }
18+
itertools = { workspace = true }
1819
rand = { workspace = true }
1920
serde = { workspace = true }
2021
serde_json = "1.0"
2122
tantivy = { workspace = true }
2223
tempfile = { workspace = true }
24+
time = { workspace = true }
2325
thiserror = { workspace = true }
2426
tokio = { workspace = true }
2527
tokio-stream = { workspace = true }
@@ -31,7 +33,6 @@ quickwit-config = { workspace = true }
3133
quickwit-directories = { workspace = true }
3234
quickwit-doc-mapper = { workspace = true }
3335
quickwit-indexing = { workspace = true }
34-
quickwit-janitor = { workspace = true }
3536
quickwit-metastore = { workspace = true }
3637
quickwit-proto = { workspace = true }
3738
quickwit-storage = { workspace = true }

quickwit/quickwit-janitor/src/garbage_collection.rs renamed to quickwit/quickwit-index-management/src/garbage_collection.rs

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ use std::sync::Arc;
2323
use std::time::Duration;
2424

2525
use futures::Future;
26-
use quickwit_actors::ActorContext;
27-
use quickwit_common::PrettySample;
26+
use quickwit_common::{PrettySample, Progress};
2827
use quickwit_metastore::{
2928
ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
3029
};
@@ -34,8 +33,6 @@ use thiserror::Error;
3433
use time::OffsetDateTime;
3534
use tracing::{error, instrument};
3635

37-
use crate::actors::GarbageCollector;
38-
3936
/// The maximum number of splits that the GC should delete per attempt.
4037
const DELETE_SPLITS_BATCH_SIZE: usize = 1000;
4138

@@ -51,17 +48,14 @@ pub struct DeleteSplitsError {
5148
metastore_failures: Vec<SplitInfo>,
5249
}
5350

54-
async fn protect_future<Fut, T>(
55-
ctx_opt: Option<&ActorContext<GarbageCollector>>,
56-
future: Fut,
57-
) -> T
58-
where
59-
Fut: Future<Output = T>,
60-
{
61-
if let Some(ctx) = ctx_opt {
62-
ctx.protect_future(future).await
63-
} else {
64-
future.await
51+
async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
52+
where Fut: Future<Output = T> {
53+
match progress {
54+
None => future.await,
55+
Some(progress) => {
56+
let _guard = progress.protect_zone();
57+
future.await
58+
}
6559
}
6660
}
6761

@@ -83,15 +77,15 @@ pub struct SplitRemovalInfo {
8377
/// * `deletion_grace_period` - Threshold period after which a marked as deleted split can be
8478
/// safely deleted.
8579
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
86-
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
80+
/// * `progress` - For reporting progress (useful when called from within a quickwit actor).
8781
pub async fn run_garbage_collect(
8882
index_uid: IndexUid,
8983
storage: Arc<dyn Storage>,
9084
metastore: Arc<dyn Metastore>,
9185
staged_grace_period: Duration,
9286
deletion_grace_period: Duration,
9387
dry_run: bool,
94-
ctx_opt: Option<&ActorContext<GarbageCollector>>,
88+
progress_opt: Option<&Progress>,
9589
) -> anyhow::Result<SplitRemovalInfo> {
9690
// Select staged splits with staging timestamp older than grace period timestamp.
9791
let grace_period_timestamp =
@@ -102,7 +96,7 @@ pub async fn run_garbage_collect(
10296
.with_update_timestamp_lte(grace_period_timestamp);
10397

10498
let deletable_staged_splits: Vec<SplitMetadata> =
105-
protect_future(ctx_opt, metastore.list_splits(query))
99+
protect_future(progress_opt, metastore.list_splits(query))
106100
.await?
107101
.into_iter()
108102
.map(|meta| meta.split_metadata)
@@ -112,11 +106,12 @@ pub async fn run_garbage_collect(
112106
let query = ListSplitsQuery::for_index(index_uid.clone())
113107
.with_split_state(SplitState::MarkedForDeletion);
114108

115-
let mut splits_marked_for_deletion = protect_future(ctx_opt, metastore.list_splits(query))
116-
.await?
117-
.into_iter()
118-
.map(|split| split.split_metadata)
119-
.collect::<Vec<_>>();
109+
let mut splits_marked_for_deletion =
110+
protect_future(progress_opt, metastore.list_splits(query))
111+
.await?
112+
.into_iter()
113+
.map(|split| split.split_metadata)
114+
.collect::<Vec<_>>();
120115
splits_marked_for_deletion.extend(deletable_staged_splits);
121116

122117
let candidate_entries: Vec<SplitInfo> = splits_marked_for_deletion
@@ -136,7 +131,7 @@ pub async fn run_garbage_collect(
136131
.collect();
137132
if !split_ids.is_empty() {
138133
protect_future(
139-
ctx_opt,
134+
progress_opt,
140135
metastore.mark_splits_for_deletion(index_uid.clone(), &split_ids),
141136
)
142137
.await?;
@@ -152,14 +147,13 @@ pub async fn run_garbage_collect(
152147
updated_before_timestamp,
153148
storage,
154149
metastore,
155-
ctx_opt,
150+
progress_opt,
156151
)
157152
.await;
158153

159154
Ok(deleted_splits)
160155
}
161-
162-
#[instrument(skip(storage, metastore, ctx_opt))]
156+
#[instrument(skip(storage, metastore, progress_opt))]
163157
/// Removes any splits marked for deletion which haven't been
164158
/// updated after `updated_before_timestamp` in batches of 1000 splits.
165159
///
@@ -170,7 +164,7 @@ async fn delete_splits_marked_for_deletion(
170164
updated_before_timestamp: i64,
171165
storage: Arc<dyn Storage>,
172166
metastore: Arc<dyn Metastore>,
173-
ctx_opt: Option<&ActorContext<GarbageCollector>>,
167+
progress_opt: Option<&Progress>,
174168
) -> SplitRemovalInfo {
175169
let mut removed_splits = Vec::new();
176170
let mut failed_splits = Vec::new();
@@ -181,7 +175,7 @@ async fn delete_splits_marked_for_deletion(
181175
.with_update_timestamp_lte(updated_before_timestamp)
182176
.with_limit(DELETE_SPLITS_BATCH_SIZE);
183177

184-
let list_splits_result = protect_future(ctx_opt, metastore.list_splits(query)).await;
178+
let list_splits_result = protect_future(progress_opt, metastore.list_splits(query)).await;
185179

186180
let splits_to_delete = match list_splits_result {
187181
Ok(splits) => splits,
@@ -205,7 +199,7 @@ async fn delete_splits_marked_for_deletion(
205199
storage.clone(),
206200
metastore.clone(),
207201
splits_to_delete,
208-
ctx_opt,
202+
progress_opt,
209203
)
210204
.await;
211205

@@ -234,13 +228,13 @@ async fn delete_splits_marked_for_deletion(
234228
/// * `storage - The storage managing the target index.
235229
/// * `metastore` - The metastore managing the target index.
236230
/// * `splits` - The list of splits to delete.
237-
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
231+
/// * `progress` - For reporting progress (useful when called from within a quickwit actor).
238232
pub async fn delete_splits_from_storage_and_metastore(
239233
index_uid: IndexUid,
240234
storage: Arc<dyn Storage>,
241235
metastore: Arc<dyn Metastore>,
242236
splits: Vec<SplitMetadata>,
243-
ctx_opt: Option<&ActorContext<GarbageCollector>>,
237+
progress_opt: Option<&Progress>,
244238
) -> anyhow::Result<Vec<SplitInfo>, DeleteSplitsError> {
245239
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());
246240

@@ -252,10 +246,10 @@ pub async fn delete_splits_from_storage_and_metastore(
252246
.keys()
253247
.map(|split_path_buf| split_path_buf.as_path())
254248
.collect::<Vec<&Path>>();
255-
let delete_result = protect_future(ctx_opt, storage.bulk_delete(&split_paths)).await;
249+
let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await;
256250

257-
if let Some(ctx) = ctx_opt {
258-
ctx.record_progress();
251+
if let Some(progress) = progress_opt {
252+
progress.record_progress();
259253
}
260254
let mut successes = Vec::with_capacity(split_infos.len());
261255
let mut storage_error: Option<BulkDeleteError> = None;
@@ -292,7 +286,7 @@ pub async fn delete_splits_from_storage_and_metastore(
292286
.map(|split_info| split_info.split_id.as_str())
293287
.collect();
294288
let metastore_result = protect_future(
295-
ctx_opt,
289+
progress_opt,
296290
metastore.delete_splits(index_uid.clone(), &split_ids),
297291
)
298292
.await;

quickwit/quickwit-index-management/src/index.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ use std::time::Duration;
2424
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
2525
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
2626
use quickwit_indexing::check_source_connectivity;
27-
use quickwit_janitor::{
28-
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
29-
SplitRemovalInfo,
30-
};
3127
use quickwit_metastore::{
3228
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
3329
};
@@ -36,6 +32,11 @@ use quickwit_storage::{StorageResolver, StorageResolverError};
3632
use thiserror::Error;
3733
use tracing::{error, info};
3834

35+
use crate::garbage_collection::{
36+
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
37+
SplitRemovalInfo,
38+
};
39+
3940
#[derive(Error, Debug)]
4041
pub enum IndexServiceError {
4142
#[error("Failed to resolve the storage `{0}`.")]
@@ -351,8 +352,10 @@ pub async fn validate_storage_uri(
351352

352353
#[cfg(test)]
353354
mod tests {
355+
354356
use quickwit_common::uri::Uri;
355-
use quickwit_metastore::metastore_for_test;
357+
use quickwit_config::IndexConfig;
358+
use quickwit_metastore::{metastore_for_test, SplitMetadata};
356359
use quickwit_storage::PutPayload;
357360

358361
use super::*;

quickwit/quickwit-index-management/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
// You should have received a copy of the GNU Affero General Public License
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

20+
mod garbage_collection;
2021
mod index;
2122

23+
pub use garbage_collection::run_garbage_collect;
2224
pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError};

quickwit/quickwit-janitor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ utoipa = { workspace = true }
3030
quickwit-actors = { workspace = true }
3131
quickwit-cluster = { workspace = true }
3232
quickwit-common = { workspace = true }
33+
quickwit-index-management = { workspace = true }
3334
quickwit-config = { workspace = true }
3435
quickwit-directories = { workspace = true }
3536
quickwit-doc-mapper = { workspace = true }

quickwit/quickwit-janitor/src/actors/garbage_collector.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@ use async_trait::async_trait;
2626
use futures::{stream, StreamExt};
2727
use itertools::Itertools;
2828
use quickwit_actors::{Actor, ActorContext, Handler};
29+
use quickwit_index_management::run_garbage_collect;
2930
use quickwit_metastore::Metastore;
3031
use quickwit_storage::StorageResolver;
3132
use serde::Serialize;
3233
use tracing::{error, info};
3334

34-
use crate::garbage_collection::run_garbage_collect;
35-
3635
const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
3736

3837
/// Staged files needs to be deleted if there was a failure.
@@ -122,7 +121,7 @@ impl GarbageCollector {
122121
STAGED_GRACE_PERIOD,
123122
DELETION_GRACE_PERIOD,
124123
false,
125-
Some(ctx),
124+
Some(ctx.progress()),
126125
).await;
127126
Some((index_uid, gc_res))
128127
}}).buffer_unordered(MAX_CONCURRENT_GC_TASKS);

quickwit/quickwit-janitor/src/lib.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,12 @@ use tracing::info;
3030

3131
pub mod actors;
3232
pub mod error;
33-
mod garbage_collection;
3433
mod janitor_service;
3534
mod metrics;
3635
mod retention_policy_execution;
3736

3837
pub use janitor_service::JanitorService;
3938

40-
pub use self::garbage_collection::{
41-
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
42-
SplitRemovalInfo,
43-
};
4439
use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor};
4540

4641
#[derive(utoipa::OpenApi)]

0 commit comments

Comments
 (0)