Skip to content

Commit 5e53d4c

Browse files
ffarallTDemeco
andauthored
fix: 🚑 Handle multiple file records for same file_key (#593)
* fix: 🚑 `get_by_file_key` now returns a vector and there's a `get_latest_by_file_key` * fix: 🚑 Propagate fix to backend functions * fix: 🚑 ensure file operations in indexer are done by id instead of file key * fix: 🔥 remove unused import * fix: 🐛 update `delete_file` of backend tests with file id argument --------- Co-authored-by: Tobi Demeco <[email protected]>
1 parent cc129dd commit 5e53d4c

File tree

6 files changed

+171
-143
lines changed

6 files changed

+171
-143
lines changed

backend/lib/src/data/indexer_db/repository/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ pub trait IndexerOps: Send + Sync {
111111

112112
/// Retrieve the file identified with the given File Key
113113
///
114+
/// There can be multiple file records for a given file key if there were multiple
115+
/// storage requests for the same file key. We get the oldest one created, which
116+
/// would be the original storage request that first created the file.
117+
/// This is good enough for the purpose of this query.
118+
///
114119
/// # Arguments
115120
/// * `key` - the File Key to search
116121
async fn get_file_by_file_key(&self, file_key: &Hash) -> RepositoryResult<File>;

backend/lib/src/data/indexer_db/repository/postgres.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,11 @@ impl IndexerOps for Repository {
138138
async fn get_file_by_file_key(&self, file_key: &Hash) -> RepositoryResult<File> {
139139
let mut conn = self.pool.get().await?;
140140

141-
File::get_by_file_key(&mut conn, file_key.as_bytes())
141+
// There can be multiple file records for a given file key if there were multiple
142+
// storage requests for the same file key. We get the oldest one created, which
143+
// would be the original storage request that first created the file.
144+
// This is good enough for the purpose of this query.
145+
File::get_oldest_by_file_key(&mut conn, file_key.as_bytes())
142146
.await
143147
.map_err(Into::into)
144148
}
@@ -307,7 +311,8 @@ impl IndexerOpsMut for Repository {
307311
let mut conn = self.pool.get().await?;
308312

309313
// TODO: also clear related associations, like bsp_file
310-
File::delete(&mut conn, file_key.as_bytes()).await?;
314+
let file_record = File::get_latest_by_file_key(&mut conn, file_key.as_bytes()).await?;
315+
File::delete(&mut conn, file_record.id).await?;
311316
Ok(())
312317
}
313318

client/indexer-db/src/models/file.rs

Lines changed: 104 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use shc_common::types::{FileMetadata, Fingerprint};
1010

1111
use crate::{
1212
models::{Bucket, MultiAddress},
13-
schema::{bucket, file, file_peer_id, msp_file},
13+
schema::{bucket, file, file_peer_id},
1414
DbConnection,
1515
};
1616

@@ -169,22 +169,55 @@ impl File {
169169
.execute(conn)
170170
.await?;
171171

172-
// Update bucket total size and file count
173-
Bucket::increment_file_count_and_size(conn, bucket_id, size).await?;
174-
175172
Ok(file)
176173
}
177174

175+
/// Get all file records for a given file key.
176+
///
177+
/// There can be multiple file records for a given file key if there were
178+
/// multiple storage requests for the same file key.
178179
pub async fn get_by_file_key<'a>(
179180
conn: &mut DbConnection<'a>,
180181
file_key: impl AsRef<[u8]>,
182+
) -> Result<Vec<Self>, diesel::result::Error> {
183+
let file_key = file_key.as_ref().to_vec();
184+
let file_records = file::table
185+
.filter(file::file_key.eq(file_key))
186+
.load::<Self>(conn)
187+
.await?;
188+
Ok(file_records)
189+
}
190+
191+
/// Get the most recently created file record for a given file key.
192+
///
193+
/// Returns error if there are no records for the given key.
194+
pub async fn get_latest_by_file_key<'a>(
195+
conn: &mut DbConnection<'a>,
196+
file_key: impl AsRef<[u8]>,
181197
) -> Result<Self, diesel::result::Error> {
182198
let file_key = file_key.as_ref().to_vec();
183-
let file = file::table
199+
let file_record: Self = file::table
184200
.filter(file::file_key.eq(file_key))
185-
.first::<Self>(conn)
201+
.order(file::created_at.desc())
202+
.first(conn)
186203
.await?;
187-
Ok(file)
204+
Ok(file_record)
205+
}
206+
207+
/// Get the oldest file record for a given file key.
208+
///
209+
/// Returns error if there are no records for the given key.
210+
pub async fn get_oldest_by_file_key<'a>(
211+
conn: &mut DbConnection<'a>,
212+
file_key: impl AsRef<[u8]>,
213+
) -> Result<Self, diesel::result::Error> {
214+
let file_key = file_key.as_ref().to_vec();
215+
let file_record: Self = file::table
216+
.filter(file::file_key.eq(file_key))
217+
.order(file::created_at.asc())
218+
.first(conn)
219+
.await?;
220+
Ok(file_record)
188221
}
189222

190223
pub async fn update_step<'a>(
@@ -203,29 +236,14 @@ impl File {
203236

204237
pub async fn delete<'a>(
205238
conn: &mut DbConnection<'a>,
206-
file_key: impl AsRef<[u8]>,
239+
file_id: i64,
207240
) -> Result<(), diesel::result::Error> {
208-
let file_key = file_key.as_ref().to_vec();
209-
210-
// Get file info before deletion
211-
let file_info: Option<(i64, i64)> = file::table
212-
.filter(file::file_key.eq(&file_key))
213-
.select((file::bucket_id, file::size))
214-
.first(conn)
215-
.await
216-
.optional()?;
217-
218-
// Delete the file
241+
// Delete the file by its ID
219242
diesel::delete(file::table)
220-
.filter(file::file_key.eq(file_key))
243+
.filter(file::id.eq(file_id))
221244
.execute(conn)
222245
.await?;
223246

224-
// Update bucket counts if file was found
225-
if let Some((bucket_id, file_size)) = file_info {
226-
Bucket::decrement_file_count_and_size(conn, bucket_id, file_size).await?;
227-
}
228-
229247
Ok(())
230248
}
231249

@@ -269,99 +287,68 @@ impl File {
269287
Ok(())
270288
}
271289

272-
/// Check if file has any MSP associations
273-
pub async fn has_msp_associations<'a>(
274-
conn: &mut DbConnection<'a>,
275-
file_key: impl AsRef<[u8]>,
276-
) -> Result<bool, diesel::result::Error> {
277-
let file_key = file_key.as_ref().to_vec();
278-
279-
// Get file ID
280-
let file_id: Option<i64> = file::table
281-
.filter(file::file_key.eq(&file_key))
282-
.select(file::id)
283-
.first(conn)
284-
.await
285-
.optional()?;
286-
287-
if let Some(file_id) = file_id {
288-
let count: i64 = msp_file::table
289-
.filter(msp_file::file_id.eq(file_id))
290-
.count()
291-
.get_result(conn)
292-
.await?;
293-
Ok(count > 0)
294-
} else {
295-
Ok(false)
296-
}
297-
}
298-
299290
/// Check if file has any BSP associations
300291
pub async fn has_bsp_associations<'a>(
301292
conn: &mut DbConnection<'a>,
302-
file_key: impl AsRef<[u8]>,
293+
file_id: i64,
303294
) -> Result<bool, diesel::result::Error> {
304295
use crate::schema::bsp_file;
305296

306-
let file_key = file_key.as_ref().to_vec();
307-
308-
// Get file ID
309-
let file_id: Option<i64> = file::table
310-
.filter(file::file_key.eq(&file_key))
311-
.select(file::id)
312-
.first(conn)
313-
.await
314-
.optional()?;
315-
316-
if let Some(file_id) = file_id {
317-
let count: i64 = bsp_file::table
318-
.filter(bsp_file::file_id.eq(file_id))
319-
.count()
320-
.get_result(conn)
321-
.await?;
322-
Ok(count > 0)
323-
} else {
324-
Ok(false)
325-
}
297+
let count: i64 = bsp_file::table
298+
.filter(bsp_file::file_id.eq(file_id))
299+
.count()
300+
.get_result(conn)
301+
.await?;
302+
Ok(count > 0)
326303
}
327304

328305
/// Delete file only if it has no BSP associations and is not in the bucket forest.
329306
/// The flag [`is_in_bucket`](File::is_in_bucket) is set to false or true based on the [`MutationsApplied`] event emitted by the proofs dealer pallet for catch all.
307+
/// Returns true if all files associated with the file key were deleted, false if any still has associations.
330308
pub async fn delete_if_orphaned<'a>(
331309
conn: &mut DbConnection<'a>,
332310
file_key: impl AsRef<[u8]>,
333-
) -> Result<bool, diesel::result::Error> {
311+
) -> Result<(), diesel::result::Error> {
334312
let file_key = file_key.as_ref();
335313

336314
// Check if file is still in bucket forest or has BSP associations
337-
let file_record: Option<Self> = file::table
315+
let file_records: Vec<Self> = file::table
338316
.filter(file::file_key.eq(file_key))
339-
.first(conn)
340-
.await
341-
.optional()?;
342-
343-
let Some(file_record) = file_record else {
344-
// File doesn't exist, nothing to delete
345-
return Ok(false);
346-
};
317+
.load(conn)
318+
.await?;
347319

348-
let has_bsp = Self::has_bsp_associations(conn, file_key).await?;
349-
let is_in_bucket = file_record.is_in_bucket;
350-
351-
if !is_in_bucket && !has_bsp {
352-
// File is not in bucket forest and has no BSP associations, safe to delete
353-
Self::delete(conn, file_key).await?;
354-
log::debug!("Deleted orphaned file key: {:?}", file_key);
355-
Ok(true)
356-
} else {
357-
log::debug!(
358-
"File with key {:?} still has storage (in_bucket: {}, BSP: {}), keeping it",
359-
file_key,
360-
is_in_bucket,
361-
has_bsp
362-
);
363-
Ok(false)
320+
// For each found file record, check if it has any BSP or MSP associations, and delete it if it doesn't
321+
let mut deleted_all = true;
322+
for file_record in file_records {
323+
let has_bsp = Self::has_bsp_associations(conn, file_record.id).await?;
324+
let is_in_bucket = file_record.is_in_bucket;
325+
326+
if !is_in_bucket && !has_bsp {
327+
Self::delete(conn, file_record.id).await?;
328+
log::debug!(
329+
"Deleted orphaned file key: {:?} and id: {:?}",
330+
file_record.file_key,
331+
file_record.id
332+
);
333+
} else {
334+
log::debug!(
335+
"File with key {:?} and id {:?} still has storage (in_bucket: {}, BSP: {}), keeping it",
336+
file_record.file_key,
337+
file_record.id,
338+
is_in_bucket,
339+
has_bsp,
340+
);
341+
deleted_all = false;
342+
}
364343
}
344+
345+
log::debug!(
346+
"Deleted all files associated with file key: {:?}: {}",
347+
file_key,
348+
deleted_all
349+
);
350+
351+
Ok(())
365352
}
366353

367354
pub async fn get_by_bucket_id<'a>(
@@ -649,13 +636,31 @@ impl File {
649636
let file_key = file_key.as_ref().to_vec();
650637
let onchain_bucket_id = onchain_bucket_id.as_ref().to_vec();
651638

639+
// Get the file info
640+
let file_info: Option<(i64, i64)> = file::table
641+
.filter(file::file_key.eq(&file_key))
642+
.filter(file::onchain_bucket_id.eq(&onchain_bucket_id))
643+
.select((file::bucket_id, file::size))
644+
.first(conn)
645+
.await
646+
.optional()?;
647+
648+
// Update the file's bucket membership status
652649
diesel::update(file::table)
653650
.filter(file::file_key.eq(&file_key))
654651
.filter(file::onchain_bucket_id.eq(&onchain_bucket_id))
655652
.set(file::is_in_bucket.eq(is_in_bucket))
656653
.execute(conn)
657654
.await?;
658655

656+
// Update the bucket stats to reflect the change in bucket membership
657+
if let Some((bucket_id, file_size)) = file_info {
658+
match is_in_bucket {
659+
true => Bucket::increment_file_count_and_size(conn, bucket_id, file_size).await?,
660+
false => Bucket::decrement_file_count_and_size(conn, bucket_id, file_size).await?,
661+
}
662+
}
663+
659664
Ok(())
660665
}
661666
}

client/indexer-db/src/models/msp_file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl MspFile {
5353
// Log if we found multiple files with the same key
5454
if file_ids.len() > 1 {
5555
log::warn!(
56-
"Found {} files with the same file_key: {:?}. This is an inconsistent state. Will proceed to delete all associated file IDs with this key.",
56+
"Found {} files with the same file_key: {:?}. This is expected only if there was more than one storage request for the same file key. Will proceed to delete all associated file IDs with this key.",
5757
file_ids.len(),
5858
file_key
5959
);

0 commit comments

Comments
 (0)