Skip to content

Commit 2bc19e8

Browse files
committed
fix: 🚑 make it so is_in_bucket is consistent across same file key records
1 parent 527f303 commit 2bc19e8

File tree

5 files changed

+95
-9
lines changed

5 files changed

+95
-9
lines changed

client/indexer-db/migrations/.diesel_lock

Whitespace-only changes.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- This migration cannot be safely reversed because we don't track which specific
2+
-- file records had incorrect is_in_bucket=false values before the normalization.
3+
--
4+
-- Reverting this migration would require arbitrarily setting some records back to
5+
-- is_in_bucket=false, which could recreate the inconsistent state we're trying to fix.
6+
--
7+
-- If you need to rollback, the safest approach is to restore from a database backup
8+
-- taken before this migration was applied.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- Normalize is_in_bucket status across all file records with the same file_key
2+
--
3+
-- If ANY file record with a given file_key has is_in_bucket=true, then ALL
4+
-- records for that file_key should have is_in_bucket=true. This is because
5+
-- the bucket forest only contains one instance of each file_key, so if it's
6+
-- in the bucket, all storage request records for that file should reflect that.
7+
8+
UPDATE file
9+
SET is_in_bucket = true
10+
WHERE file_key IN (
11+
-- Find all file_keys where at least one record has is_in_bucket=true
12+
SELECT DISTINCT file_key
13+
FROM file
14+
WHERE is_in_bucket = true
15+
)
16+
AND is_in_bucket = false;

client/indexer-db/src/models/file.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use shc_common::types::{FileMetadata, Fingerprint};
1010

1111
use crate::{
1212
models::{Bucket, MultiAddress},
13-
schema::{bucket, file, file_peer_id},
13+
schema::{bucket, file, file_peer_id, msp_file},
1414
DbConnection,
1515
};
1616

@@ -137,6 +137,7 @@ impl File {
137137
peer_ids: Vec<crate::models::PeerId>,
138138
block_hash: Vec<u8>,
139139
tx_hash: Option<Vec<u8>>,
140+
is_in_bucket: bool,
140141
) -> Result<Self, diesel::result::Error> {
141142
let file = diesel::insert_into(file::table)
142143
.values((
@@ -150,7 +151,7 @@ impl File {
150151
file::step.eq(step as i32),
151152
file::deletion_status.eq(None::<i32>),
152153
file::deletion_signature.eq(None::<Vec<u8>>),
153-
file::is_in_bucket.eq(false),
154+
file::is_in_bucket.eq(is_in_bucket),
154155
file::block_hash.eq(block_hash),
155156
file::tx_hash.eq(tx_hash),
156157
))
@@ -224,6 +225,26 @@ impl File {
224225
Ok(file_record)
225226
}
226227

228+
/// Check if any file record with the given file key is currently in the bucket forest.
229+
///
230+
/// This is useful when creating new file records for repeated storage requests
231+
/// to inherit the bucket membership status from previous requests, since for example if the
232+
/// MSP was already storing the file key, the `MutationsApplied` event won't be emitted for it
233+
/// so if we default `is_in_bucket` to false it would be incorrectly marked as not in the bucket.
234+
pub async fn is_file_key_in_bucket<'a>(
235+
conn: &mut DbConnection<'a>,
236+
file_key: impl AsRef<[u8]>,
237+
) -> Result<bool, diesel::result::Error> {
238+
let file_key = file_key.as_ref().to_vec();
239+
let count: i64 = file::table
240+
.filter(file::file_key.eq(file_key))
241+
.filter(file::is_in_bucket.eq(true))
242+
.count()
243+
.get_result(conn)
244+
.await?;
245+
Ok(count > 0)
246+
}
247+
227248
pub async fn update_step<'a>(
228249
conn: &mut DbConnection<'a>,
229250
file_key: impl AsRef<[u8]>,
@@ -306,6 +327,22 @@ impl File {
306327
Ok(count > 0)
307328
}
308329

330+
/// Check if a file has any MSP associations
331+
///
332+
/// TODO: This check is not used for now, but should be used in the future to prevent the
333+
/// indexer from trying to delete a file that still has associations and getting stuck.
334+
pub async fn has_msp_associations<'a>(
335+
conn: &mut DbConnection<'a>,
336+
file_id: i64,
337+
) -> Result<bool, diesel::result::Error> {
338+
let count: i64 = msp_file::table
339+
.filter(msp_file::file_id.eq(file_id))
340+
.count()
341+
.get_result(conn)
342+
.await?;
343+
Ok(count > 0)
344+
}
345+
309346
/// Delete file only if it has no BSP associations and is not in the bucket forest.
310347
/// The flag [`is_in_bucket`](File::is_in_bucket) is set to false or true based on the [`MutationsApplied`] event emitted by the proofs dealer pallet for catch all.
311348
/// Returns true if all files associated with the file key were deleted, false if any still has associations.
@@ -636,8 +673,10 @@ impl File {
636673
/// Update the bucket membership status for a file.
637674
///
638675
/// Updates `is_in_bucket` based on mutations applied to the bucket's forest.
639-
/// The file is identified by both `file_key` and `onchain_bucket_id` to ensure
640-
/// we're updating the correct file-bucket relationship.
676+
/// The file is identified by both `file_key` and `onchain_bucket_id`.
677+
///
678+
/// This updates all file records with the same file key (for cases where there were
679+
/// multiple storage requests for the same file).
641680
pub async fn update_bucket_membership<'a>(
642681
conn: &mut DbConnection<'a>,
643682
file_key: impl AsRef<[u8]>,
@@ -647,7 +686,8 @@ impl File {
647686
let file_key = file_key.as_ref().to_vec();
648687
let onchain_bucket_id = onchain_bucket_id.as_ref().to_vec();
649688

650-
// Get the file info
689+
// Get the file info (bucket ID and size) from one of the file records since
690+
// all records should have the same values
651691
let file_info: Option<(i64, i64)> = file::table
652692
.filter(file::file_key.eq(&file_key))
653693
.filter(file::onchain_bucket_id.eq(&onchain_bucket_id))
@@ -656,7 +696,7 @@ impl File {
656696
.await
657697
.optional()?;
658698

659-
// Update the file's bucket membership status
699+
// Update all file records with this file key to have the new bucket membership status
660700
diesel::update(file::table)
661701
.filter(file::file_key.eq(&file_key))
662702
.filter(file::onchain_bucket_id.eq(&onchain_bucket_id))

client/indexer-service/src/handler.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,13 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
385385
let block_hash_bytes = block_hash.as_bytes().to_vec();
386386
let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec());
387387

388+
// Check if this file key is already present in the bucket of the MSP
389+
// In this scenario, this will always return false, since there's no other file record
390+
// in the DB, but it's still a good practice to check it.
391+
let is_in_bucket =
392+
File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec())
393+
.await?;
394+
388395
// Create file with Requested step since we will change it to Stored when the storage request is fulfilled
389396
File::create(
390397
conn,
@@ -399,6 +406,7 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
399406
vec![], // No peer_ids available from confirmation event
400407
block_hash_bytes,
401408
tx_hash_bytes,
409+
is_in_bucket,
402410
)
403411
.await?
404412
}
@@ -445,6 +453,12 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
445453
// Convert EVM tx hash to bytes if present
446454
let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec());
447455

456+
// Check if this file key is already present in the bucket of the MSP
457+
// This is because the `MutationsApplied` event won't be emitted for this file key when
458+
// the MSP accepts it because the MSP is already storing it.
459+
let is_in_bucket =
460+
File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()).await?;
461+
448462
File::create(
449463
conn,
450464
who,
@@ -458,6 +472,7 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
458472
sql_peer_ids,
459473
block_hash_bytes,
460474
tx_hash_bytes,
475+
is_in_bucket,
461476
)
462477
.await?;
463478
}
@@ -547,6 +562,12 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
547562
let block_hash_bytes = block_hash.as_bytes().to_vec();
548563
let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec());
549564

565+
// Check if this file key is already present in the bucket of the MSP
566+
// This is because the `MutationsApplied` event won't be emitted for this file key because
567+
// the MSP was already storing it.
568+
let is_in_bucket =
569+
File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()).await?;
570+
550571
// Create file with Requested step since we will change it to Stored when the storage request is fulfilled
551572
File::create(
552573
conn,
@@ -561,6 +582,7 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
561582
vec![], // No peer_ids available from acceptance event
562583
block_hash_bytes,
563584
tx_hash_bytes,
585+
is_in_bucket,
564586
)
565587
.await?
566588
}
@@ -732,9 +754,9 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
732754
// No storage, safe to delete immediately
733755
File::delete(conn, file_record.id).await?;
734756
log::debug!(
735-
"Incomplete storage request for file key {:?} and id {:?} is not being stored, deleted immediately",
736-
file_key, file_record.id
737-
);
757+
"Incomplete storage request for file key {:?} and id {:?} is not being stored, deleted immediately",
758+
file_key, file_record.id,
759+
);
738760
}
739761
}
740762
}

0 commit comments

Comments
 (0)