Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/lib/src/data/indexer_db/repository/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ impl IndexerOpsMut for Repository {
vec![], // No peer_ids for simple test data
vec![0u8; 32], // Placeholder block hash for test data
None, // No transaction hash for test data
false, // Default to not in bucket for test data
)
.await?;

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This migration cannot be safely reversed because we don't track which specific
-- file records had incorrect is_in_bucket=false values before the normalization.
--
-- Reverting this migration would require arbitrarily setting some records back to
-- is_in_bucket=false, which could recreate the inconsistent state we're trying to fix.
--
-- If you need to rollback, the safest approach is to restore from a database backup
-- taken before this migration was applied.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Normalize is_in_bucket status across all file records with the same file_key
--
-- If ANY file record with a given file_key has is_in_bucket=true, then ALL
-- records for that file_key should have is_in_bucket=true. This is because
Comment on lines +3 to +4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only way this can happen is if the SR is already accepted by the MSP and the user send other one to increase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as that's the only way to have more than one file record for the same file key.

-- the bucket forest only contains one instance of each file_key, so if it's
-- in the bucket, all storage request records for that file should reflect that.

UPDATE file
SET is_in_bucket = true
WHERE file_key IN (
-- Find all file_keys where at least one record has is_in_bucket=true
SELECT DISTINCT file_key
FROM file
WHERE is_in_bucket = true
)
AND is_in_bucket = false;
52 changes: 46 additions & 6 deletions client/indexer-db/src/models/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use shc_common::types::{FileMetadata, Fingerprint};

use crate::{
models::{Bucket, MultiAddress},
schema::{bucket, file, file_peer_id},
schema::{bucket, file, file_peer_id, msp_file},
DbConnection,
};

Expand Down Expand Up @@ -137,6 +137,7 @@ impl File {
peer_ids: Vec<crate::models::PeerId>,
block_hash: Vec<u8>,
tx_hash: Option<Vec<u8>>,
is_in_bucket: bool,
) -> Result<Self, diesel::result::Error> {
let file = diesel::insert_into(file::table)
.values((
Expand All @@ -150,7 +151,7 @@ impl File {
file::step.eq(step as i32),
file::deletion_status.eq(None::<i32>),
file::deletion_signature.eq(None::<Vec<u8>>),
file::is_in_bucket.eq(false),
file::is_in_bucket.eq(is_in_bucket),
file::block_hash.eq(block_hash),
file::tx_hash.eq(tx_hash),
))
Expand Down Expand Up @@ -224,6 +225,26 @@ impl File {
Ok(file_record)
}

/// Check if any file record with the given file key is currently in the bucket forest.
///
/// This is useful when creating new file records for repeated storage requests
/// to inherit the bucket membership status from previous requests, since for example if the
/// MSP was already storing the file key, the `MutationsApplied` event won't be emitted for it
/// so if we default `is_in_bucket` to false it would be incorrectly marked as not in the bucket.
pub async fn is_file_key_in_bucket<'a>(
conn: &mut DbConnection<'a>,
file_key: impl AsRef<[u8]>,
) -> Result<bool, diesel::result::Error> {
let file_key = file_key.as_ref().to_vec();
let count: i64 = file::table
.filter(file::file_key.eq(file_key))
.filter(file::is_in_bucket.eq(true))
.count()
.get_result(conn)
.await?;
Ok(count > 0)
}

pub async fn update_step<'a>(
conn: &mut DbConnection<'a>,
file_key: impl AsRef<[u8]>,
Expand Down Expand Up @@ -306,6 +327,22 @@ impl File {
Ok(count > 0)
}

/// Check if a file has any MSP associations
///
/// TODO: This check is not used for now, but should be used in the future to prevent the
/// indexer from trying to delete a file that still has associations and getting stuck.
pub async fn has_msp_associations<'a>(
conn: &mut DbConnection<'a>,
file_id: i64,
) -> Result<bool, diesel::result::Error> {
let count: i64 = msp_file::table
.filter(msp_file::file_id.eq(file_id))
.count()
.get_result(conn)
.await?;
Ok(count > 0)
}

/// Delete file only if it has no BSP associations and is not in the bucket forest.
/// The flag [`is_in_bucket`](File::is_in_bucket) is set to false or true based on the [`MutationsApplied`] event emitted by the proofs dealer pallet for catch all.
/// Returns true if all files associated with the file key were deleted, false if any still has associations.
Expand Down Expand Up @@ -636,8 +673,10 @@ impl File {
/// Update the bucket membership status for a file.
///
/// Updates `is_in_bucket` based on mutations applied to the bucket's forest.
/// The file is identified by both `file_key` and `onchain_bucket_id` to ensure
/// we're updating the correct file-bucket relationship.
/// The file is identified by both `file_key` and `onchain_bucket_id`.
///
/// This updates all file records with the same file key (for cases where there were
/// multiple storage requests for the same file).
pub async fn update_bucket_membership<'a>(
conn: &mut DbConnection<'a>,
file_key: impl AsRef<[u8]>,
Expand All @@ -647,7 +686,8 @@ impl File {
let file_key = file_key.as_ref().to_vec();
let onchain_bucket_id = onchain_bucket_id.as_ref().to_vec();

// Get the file info
// Get the file info (bucket ID and size) from one of the file records since
// all records should have the same values
let file_info: Option<(i64, i64)> = file::table
.filter(file::file_key.eq(&file_key))
.filter(file::onchain_bucket_id.eq(&onchain_bucket_id))
Expand All @@ -656,7 +696,7 @@ impl File {
.await
.optional()?;

// Update the file's bucket membership status
// Update all file records with this file key to have the new bucket membership status
diesel::update(file::table)
.filter(file::file_key.eq(&file_key))
.filter(file::onchain_bucket_id.eq(&onchain_bucket_id))
Expand Down
28 changes: 25 additions & 3 deletions client/indexer-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,13 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
let block_hash_bytes = block_hash.as_bytes().to_vec();
let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec());

// Check if this file key is already present in the bucket of the MSP
// In this scenario, this will always return false, since there's no other file record
// in the DB, but it's still a good practice to check it.
let is_in_bucket =
File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec())
.await?;

// Create file with Requested step since we will change it to Stored when the storage request is fulfilled
File::create(
conn,
Expand All @@ -399,6 +406,7 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
vec![], // No peer_ids available from confirmation event
block_hash_bytes,
tx_hash_bytes,
is_in_bucket,
)
.await?
}
Expand Down Expand Up @@ -445,6 +453,12 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
// Convert EVM tx hash to bytes if present
let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec());

// Check if this file key is already present in the bucket of the MSP
// This is because the `MutationsApplied` event won't be emitted for this file key when
// the MSP accepts it because the MSP is already storing it.
let is_in_bucket =
File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()).await?;

File::create(
conn,
who,
Expand All @@ -458,6 +472,7 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
sql_peer_ids,
block_hash_bytes,
tx_hash_bytes,
is_in_bucket,
)
.await?;
}
Expand Down Expand Up @@ -547,6 +562,12 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
let block_hash_bytes = block_hash.as_bytes().to_vec();
let tx_hash_bytes = evm_tx_hash.map(|h| h.as_bytes().to_vec());

// Check if this file key is already present in the bucket of the MSP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when the user ask for more redundancy the MSP needs to "re-accept" it, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise the new storage request will expire without an MSP response, so it will be considered rejected.

// This is because the `MutationsApplied` event won't be emitted for this file key because
// the MSP was already storing it.
let is_in_bucket =
File::is_file_key_in_bucket(conn, file_key.as_ref().to_vec()).await?;

// Create file with Requested step since we will change it to Stored when the storage request is fulfilled
File::create(
conn,
Expand All @@ -561,6 +582,7 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
vec![], // No peer_ids available from acceptance event
block_hash_bytes,
tx_hash_bytes,
is_in_bucket,
)
.await?
}
Expand Down Expand Up @@ -732,9 +754,9 @@ impl<Runtime: StorageEnableRuntime> IndexerService<Runtime> {
// No storage, safe to delete immediately
File::delete(conn, file_record.id).await?;
log::debug!(
"Incomplete storage request for file key {:?} and id {:?} is not being stored, deleted immediately",
file_key, file_record.id
);
"Incomplete storage request for file key {:?} and id {:?} is not being stored, deleted immediately",
file_key, file_record.id,
);
}
}
}
Expand Down
Loading