diff --git a/backend/lib/src/data/indexer_db/repository/postgres.rs b/backend/lib/src/data/indexer_db/repository/postgres.rs index 117f1a65d..195967a75 100644 --- a/backend/lib/src/data/indexer_db/repository/postgres.rs +++ b/backend/lib/src/data/indexer_db/repository/postgres.rs @@ -301,6 +301,7 @@ impl IndexerOpsMut for Repository { vec![], // No peer_ids for simple test data vec![0u8; 32], // Placeholder block hash for test data None, // No transaction hash for test data + false, // Default to not in bucket for test data ) .await?; diff --git a/client/indexer-db/migrations/2025-12-05-191030_normalize_is_in_bucket_across_file_records/down.sql b/client/indexer-db/migrations/2025-12-05-191030_normalize_is_in_bucket_across_file_records/down.sql new file mode 100644 index 000000000..0c7bce300 --- /dev/null +++ b/client/indexer-db/migrations/2025-12-05-191030_normalize_is_in_bucket_across_file_records/down.sql @@ -0,0 +1,8 @@ +-- This migration cannot be safely reversed because we don't track which specific +-- file records had incorrect is_in_bucket=false values before the normalization. +-- +-- Reverting this migration would require arbitrarily setting some records back to +-- is_in_bucket=false, which could recreate the inconsistent state we're trying to fix. +-- +-- If you need to rollback, the safest approach is to restore from a database backup +-- taken before this migration was applied. diff --git a/client/indexer-db/migrations/2025-12-05-191030_normalize_is_in_bucket_across_file_records/up.sql b/client/indexer-db/migrations/2025-12-05-191030_normalize_is_in_bucket_across_file_records/up.sql new file mode 100644 index 000000000..bff4c4386 --- /dev/null +++ b/client/indexer-db/migrations/2025-12-05-191030_normalize_is_in_bucket_across_file_records/up.sql @@ -0,0 +1,16 @@ +-- Normalize is_in_bucket status across all file records with the same file_key +-- +-- If ANY file record with a given file_key has is_in_bucket=true, then ALL +-- records for that file_key should have is_in_bucket=true. This is because +-- the bucket forest only contains one instance of each file_key, so if it's +-- in the bucket, all storage request records for that file should reflect that. + +UPDATE file +SET is_in_bucket = true +WHERE file_key IN ( + -- Find all file_keys where at least one record has is_in_bucket=true + SELECT DISTINCT file_key + FROM file + WHERE is_in_bucket = true +) +AND is_in_bucket = false; diff --git a/client/indexer-db/src/models/file.rs b/client/indexer-db/src/models/file.rs index 32fb3caf1..0a3a20eda 100644 --- a/client/indexer-db/src/models/file.rs +++ b/client/indexer-db/src/models/file.rs @@ -10,7 +10,7 @@ use shc_common::types::{FileMetadata, Fingerprint}; use crate::{ models::{Bucket, MultiAddress}, - schema::{bucket, file, file_peer_id}, + schema::{bucket, file, file_peer_id, msp_file}, DbConnection, }; @@ -137,6 +137,7 @@ impl File { peer_ids: Vec, block_hash: Vec, tx_hash: Option>, + is_in_bucket: bool, ) -> Result { let file = diesel::insert_into(file::table) .values(( @@ -150,7 +151,7 @@ impl File { file::step.eq(step as i32), file::deletion_status.eq(None::), file::deletion_signature.eq(None::>), - file::is_in_bucket.eq(false), + file::is_in_bucket.eq(is_in_bucket), file::block_hash.eq(block_hash), file::tx_hash.eq(tx_hash), )) @@ -224,6 +225,26 @@ impl File { Ok(file_record) } + /// Check if any file record with the given file key is currently in the bucket forest. + /// + /// This is useful when creating new file records for repeated storage requests + /// to inherit the bucket membership status from previous requests, since for example if the + /// MSP was already storing the file key, the `MutationsApplied` event won't be emitted for it + /// so if we default `is_in_bucket` to false it would be incorrectly marked as not in the bucket. + pub async fn is_file_key_in_bucket<'a>( + conn: &mut DbConnection<'a>, + file_key: impl AsRef<[u8]>, + ) -> Result { + let file_key = file_key.as_ref().to_vec(); + let count: i64 = file::table + .filter(file::file_key.eq(file_key)) + .filter(file::is_in_bucket.eq(true)) + .count() + .get_result(conn) + .await?; + Ok(count > 0) + } + pub async fn update_step<'a>( conn: &mut DbConnection<'a>, file_key: impl AsRef<[u8]>, @@ -306,6 +327,22 @@ impl File { Ok(count > 0) } + /// Check if a file has any MSP associations + /// + /// TODO: This check is not used for now, but should be used in the future to prevent the + /// indexer from trying to delete a file that still has associations and getting stuck. + pub async fn has_msp_associations<'a>( + conn: &mut DbConnection<'a>, + file_id: i64, + ) -> Result { + let count: i64 = msp_file::table + .filter(msp_file::file_id.eq(file_id)) + .count() + .get_result(conn) + .await?; + Ok(count > 0) + } + /// Delete file only if it has no BSP associations and is not in the bucket forest. /// The flag [`is_in_bucket`](File::is_in_bucket) is set to false or true based on the [`MutationsApplied`] event emitted by the proofs dealer pallet for catch all. /// Returns true if all files associated with the file key were deleted, false if any still has associations. @@ -636,8 +673,10 @@ impl File { /// Update the bucket membership status for a file. /// /// Updates `is_in_bucket` based on mutations applied to the bucket's forest. - /// The file is identified by both `file_key` and `onchain_bucket_id` to ensure - /// we're updating the correct file-bucket relationship. + /// The file is identified by both `file_key` and `onchain_bucket_id`. + /// + /// This updates all file records with the same file key (for cases where there were + /// multiple storage requests for the same file). pub async fn update_bucket_membership<'a>( conn: &mut DbConnection<'a>, file_key: impl AsRef<[u8]>, @@ -647,7 +686,8 @@ impl File { let file_key = file_key.as_ref().to_vec(); let onchain_bucket_id = onchain_bucket_id.as_ref().to_vec(); - // Get the file info + // Get the file info (bucket ID and size) from one of the file records since + // all records should have the same values let file_info: Option<(i64, i64)> = file::table .filter(file::file_key.eq(&file_key)) .filter(file::onchain_bucket_id.eq(&onchain_bucket_id)) @@ -656,7 +696,7 @@ impl File { .await .optional()?; - // Update the file's bucket membership status + // Update all file records with this file key to have the new bucket membership status diesel::update(file::table) .filter(file::file_key.eq(&file_key)) .filter(file::onchain_bucket_id.eq(&onchain_bucket_id)) diff --git a/client/indexer-service/src/handler.rs b/client/indexer-service/src/handler.rs index 402848e73..b8b0ae8ae 100644 --- a/client/indexer-service/src/handler.rs +++ b/client/indexer-service/src/handler.rs @@ -385,6 +385,13 @@ impl IndexerService { let block_hash_bytes = block_hash.as_bytes().to_vec(); let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec()); + // Check if this file key is already present in the bucket of the MSP + // In this scenario, this will always return false, since there's no other file record + // in the DB, but it's still good practice to check it. + let is_in_bucket = + File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()) + .await?; + // Create file with Requested step since we will change it to Stored when the storage request is fulfilled File::create( conn, @@ -399,6 +406,7 @@ impl IndexerService { vec![], // No peer_ids available from confirmation event block_hash_bytes, tx_hash_bytes, + is_in_bucket, ) .await? } @@ -445,6 +453,14 @@ impl IndexerService { // Convert EVM tx hash to bytes if present let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec()); + // Check if this file key is already present in the bucket of the MSP + // This could happen if there was a previous storage request for this file key that + // the MSP accepted, and the new storage request was issued by the user to add redundancy to it. + // We do this check because in this scenario,the `MutationsApplied` event won't be emitted for this + // file key when the MSP accepts it, as the MSP is already storing it. + let is_in_bucket = + File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()).await?; + File::create( conn, who, @@ -458,6 +474,7 @@ impl IndexerService { sql_peer_ids, block_hash_bytes, tx_hash_bytes, + is_in_bucket, ) .await?; } @@ -547,6 +564,12 @@ impl IndexerService { let block_hash_bytes = block_hash.as_bytes().to_vec(); let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec()); + // Check if this file key is already present in the bucket of the MSP + // In this scenario, this will always return false, since there's no other file record + // in the DB, but it's still a good practice to check it. + let is_in_bucket = + File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()).await?; + // Create file with Requested step since we will change it to Stored when the storage request is fulfilled File::create( conn, @@ -561,6 +584,7 @@ impl IndexerService { vec![], // No peer_ids available from acceptance event block_hash_bytes, tx_hash_bytes, + is_in_bucket, ) .await? } @@ -732,9 +756,9 @@ impl IndexerService { // No storage, safe to delete immediately File::delete(conn, file_record.id).await?; log::debug!( - "Incomplete storage request for file key {:?} and id {:?} is not being stored, deleted immediately", - file_key, file_record.id - ); + "Incomplete storage request for file key {:?} and id {:?} is not being stored, deleted immediately", + file_key, file_record.id, + ); } } }