diff --git a/Cargo.lock b/Cargo.lock index 923112ed1..af1776575 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "async-task" version = "4.7.1" @@ -4576,6 +4598,21 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "fork-tree" version = "13.0.1" @@ -6093,6 +6130,22 @@ dependencies = [ "webpki-roots 1.0.4", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.18" @@ -6112,9 +6165,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2 0.6.1", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -8485,6 +8540,23 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "netlink-packet-core" version = "0.7.0" @@ -8872,12 +8944,50 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -12737,6 +12847,8 @@ dependencies = [ "bytes", "fallible-iterator 0.2.0", "postgres-protocol", + "serde_core", + "serde_json", ] [[package]] @@ -13678,16 +13790,21 @@ checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-core", "futures-util", + "h2 0.4.12", "http 1.4.0", "http-body 1.0.1", "http-body-util", "hyper 1.8.1", "hyper-rustls 0.27.7", + "hyper-tls", "hyper-util", "js-sys", "log", + "mime", + "native-tls", "percent-encoding", "pin-project-lite", "quinn 0.11.9", @@ -13698,6 +13815,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", @@ -16685,6 +16803,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives 17.1.0 (git+https://github.com/paritytech/polkadot-sdk.git?tag=polkadot-stable2412-6)", "polkadot-runtime-common 18.1.0 (git+https://github.com/paritytech/polkadot-sdk.git?tag=polkadot-stable2412-6)", + "reqwest", "rocksdb", "sc-client-api", "sc-network", @@ -16746,9 +16865,11 @@ dependencies = [ "anyhow", "array-bytes", "async-channel 1.9.0", + "async-stream", "async-trait", "axum", "axum-extra", + "bytes", "chrono", "frame-benchmarking 39.1.0", "frame-benchmarking-cli", @@ -16775,6 +16896,7 @@ dependencies = [ "polkadot-primitives 17.1.0 (git+https://github.com/paritytech/polkadot-sdk.git?tag=polkadot-stable2412-6)", "priority-queue", "rand 0.8.5", + "reqwest", "rocksdb", "sc-client-api", "sc-network", @@ -20280,6 +20402,16 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.15" @@ -21958,6 +22090,17 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-registry" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" +dependencies = [ + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + [[package]] name = "windows-result" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 9f4465f05..95224b7ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ anyhow = "1.0.81" array-bytes = "6.1" async-channel = "1.8.0" async-io = "2.3.2" +async-stream = "0.3" async-trait = "0.1.42" axum = "0.8" axum-extra = { version = "0.10", features = [ @@ -59,6 +60,7 @@ http = "1.1" base64 = "0.21" bigdecimal = { version = "0.4.5", features = ["serde"] } bincode = "1.3.3" +bytes = "1" cfg-if = { version = "1.0.4" } clap = { version = "4.5.3", features = ["derive", "env"] } chrono = "0.4" @@ -95,7 +97,8 @@ prost = "0.12" prost-build = "0.12.3" rand = "0.8.5" reference-trie = "0.29.1" -rustls = { version = "0.23", default-features = false, features = ["ring"] } +reqwest = { version = "0.12", features = ["stream"] } +rustls = { version = "0.23", default-features = false, features = ["ring", "tls12"] } rustls-platform-verifier = "0.5" rustls-pemfile = "2.2" scale-info = { version = "2.11.0", default-features = false, features = [ diff --git a/client/Cargo.toml b/client/Cargo.toml index f40e26cbd..9311c2910 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,9 +18,11 @@ workspace = true anyhow = { workspace = true } array-bytes = { workspace = true } async-channel = { workspace = true } +async-stream = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } axum-extra = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } hex = { workspace = true, default-features = true } kvdb = { workspace = true } @@ -30,6 +32,7 @@ lru = { workspace = true } ordered-float = { workspace = true } priority-queue = { workspace = true } rand = { workspace = true } +reqwest = { workspace = true } sysinfo = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/client/blockchain-service-db/Cargo.toml b/client/blockchain-service-db/Cargo.toml index 0360e13ab..1470e333d 100644 --- a/client/blockchain-service-db/Cargo.toml +++ b/client/blockchain-service-db/Cargo.toml @@ -25,10 +25,10 @@ log = { workspace = true } rustls = { workspace = true } rustls-platform-verifier = { workspace = true } serde = { workspace = true, default-features = true } -serde_json = { workspace = true } +serde_json = { workspace = true, features = ["std"] } thiserror = { workspace = true } tokio = { workspace = true } -tokio-postgres = { workspace = true } +tokio-postgres = { workspace = true, features = ["with-serde_json-1"] } tokio-postgres-rustls = { workspace = true } rustls-pemfile = { workspace = true } sc-transaction-pool-api = { workspace = true } diff --git a/client/blockchain-service-db/migrations/00000000000002_create_leader_info/down.sql b/client/blockchain-service-db/migrations/00000000000002_create_leader_info/down.sql new file mode 100644 index 000000000..cf7e82cf3 --- /dev/null +++ b/client/blockchain-service-db/migrations/00000000000002_create_leader_info/down.sql @@ -0,0 +1,4 @@ +-- Drop the leader_info table and related objects +DROP TRIGGER IF EXISTS trg_leader_info_updated_at ON leader_info; +DROP FUNCTION IF EXISTS set_leader_info_updated_at(); +DROP TABLE IF EXISTS leader_info; diff --git a/client/blockchain-service-db/migrations/00000000000002_create_leader_info/up.sql b/client/blockchain-service-db/migrations/00000000000002_create_leader_info/up.sql new file mode 100644 index 000000000..706cfd018 --- /dev/null +++ b/client/blockchain-service-db/migrations/00000000000002_create_leader_info/up.sql @@ -0,0 +1,29 @@ +-- Create a singleton table to store leader information +-- The leader (node holding the advisory lock) writes its metadata here +-- Metadata is JSON format containing host/port information +CREATE TABLE IF NOT EXISTS leader_info ( + id INTEGER PRIMARY KEY CHECK (id = 1), -- Enforce singleton (only one row allowed) + metadata JSONB NOT NULL, -- Leader metadata + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Pre-insert the singleton row to simplify upserts +INSERT INTO leader_info (id, metadata, updated_at) +VALUES (1, '{}'::jsonb, now()) +ON CONFLICT (id) DO NOTHING; + +-- Update timestamp trigger for leader_info +CREATE OR REPLACE FUNCTION set_leader_info_updated_at() RETURNS trigger AS $$ +BEGIN + NEW.updated_at = now(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DO $$ BEGIN + CREATE TRIGGER trg_leader_info_updated_at + BEFORE UPDATE ON leader_info + FOR EACH ROW EXECUTE FUNCTION set_leader_info_updated_at(); +EXCEPTION WHEN duplicate_object THEN + NULL; +END $$; diff --git a/client/blockchain-service-db/src/leadership.rs b/client/blockchain-service-db/src/leadership.rs index 647c8f051..2d41fd888 100644 --- a/client/blockchain-service-db/src/leadership.rs +++ b/client/blockchain-service-db/src/leadership.rs @@ -1,5 +1,6 @@ use diesel::ConnectionError; use log::warn; +use serde::{Deserialize, Serialize}; use tokio_postgres::Client; use tokio_postgres_rustls::MakeRustlsConnect; @@ -17,6 +18,21 @@ pub type LeadershipClient = Client; /// In future, this could be derived from the MSP/BSP ID. pub const LEADERSHIP_LOCK_KEY: i64 = 1; +/// Advertised endpoints of a node. +/// +/// Each node maintains its own advertised endpoints. When a node becomes the leader, +/// it submits these endpoints as leader metadata to the database so followers can discover them. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeAdvertisedEndpoints { + /// WebSocket RPC URL for blockchain queries and transactions + /// Example: "ws://192.168.1.100:9944" + pub rpc_url: String, + + /// Trusted file transfer server URL for efficient file uploads + /// Example: "http://192.168.1.100:7070" + pub trusted_file_transfer_server_url: String, +} + /// Open a dedicated, TLS-enabled Postgres connection for leadership purposes. /// /// This connection is non-pooled and intended to live for the lifetime of the process. @@ -85,3 +101,88 @@ pub async fn release_leadership(client: &LeadershipClient, key: i64) -> Result Result<(), DbSetupError> { + // Serialize to JSON using serde_json + let metadata = serde_json::to_value(endpoints).map_err(|e| { + DbSetupError::ConnectionError(ConnectionError::BadConnection(format!( + "Failed to serialize NodeAdvertisedEndpoints to JSON: {}", + e + ))) + })?; + + // Update the singleton row (id=1) with the new metadata + client + .execute( + "UPDATE leader_info SET metadata = $1 WHERE id = 1", + &[&metadata], + ) + .await + .map_err(|e| { + DbSetupError::ConnectionError(ConnectionError::BadConnection(format!( + "Failed to update leader info: {}", + e + ))) + })?; + + Ok(()) +} + +/// Get the current leader's advertised endpoints from the leader_info singleton table. +/// +/// Returns `Ok(Some(endpoints))` if leader info exists and is valid, +/// `Ok(None)` if no leader info is registered or metadata is empty, +/// or `Err` if the query fails or JSON is malformed. +/// +/// # Example +/// ```no_run +/// use shc_blockchain_service_db::leadership::get_leader_info; +/// +/// # async fn example(client: &tokio_postgres::Client) -> Result<(), Box> { +/// if let Some(endpoints) = get_leader_info(client).await? { +/// println!("Leader RPC: {}", endpoints.rpc_url); +/// println!("Leader File Transfer: {}", endpoints.trusted_file_transfer_server_url); +/// } +/// # Ok(()) +/// # } +/// ``` +pub async fn get_leader_info( + client: &LeadershipClient, +) -> Result, DbSetupError> { + let rows = client + .query("SELECT metadata FROM leader_info WHERE id = 1", &[]) + .await + .map_err(|e| { + DbSetupError::ConnectionError(ConnectionError::BadConnection(format!( + "Failed to query leader info: {}", + e + ))) + })?; + + if rows.is_empty() { + return Ok(None); + } + + let metadata: serde_json::Value = rows[0].get(0); + + // If metadata is an empty object, consider it as no info + if metadata.as_object().map(|o| o.is_empty()).unwrap_or(false) { + return Ok(None); + } + + // Deserialize to NodeAdvertisedEndpoints using serde_json + let endpoints = serde_json::from_value(metadata).map_err(|e| { + DbSetupError::ConnectionError(ConnectionError::BadConnection(format!( + "Failed to deserialize NodeAdvertisedEndpoints from JSON: {}", + e + ))) + })?; + Ok(Some(endpoints)) +} diff --git a/client/blockchain-service-db/src/lib.rs b/client/blockchain-service-db/src/lib.rs index 82fe116b0..7247cf4a0 100644 --- a/client/blockchain-service-db/src/lib.rs +++ b/client/blockchain-service-db/src/lib.rs @@ -22,6 +22,7 @@ use rustls_platform_verifier::ConfigVerifierExt; use thiserror::Error; pub mod leadership; +pub use leadership::NodeAdvertisedEndpoints; pub mod models; pub mod schema; pub mod store; diff --git a/client/blockchain-service-db/src/schema.rs b/client/blockchain-service-db/src/schema.rs index 66163e63c..7679f3ebb 100644 --- a/client/blockchain-service-db/src/schema.rs +++ b/client/blockchain-service-db/src/schema.rs @@ -1,5 +1,13 @@ // @generated automatically by Diesel CLI. +diesel::table! { + leader_info (id) { + id -> Int4, + metadata -> Jsonb, + updated_at -> Timestamptz, + } +} + diesel::table! { pending_transactions (account_id, nonce) { account_id -> Bytea, @@ -14,3 +22,5 @@ diesel::table! { updated_at -> Timestamptz, } } + +diesel::allow_tables_to_appear_in_same_query!(leader_info, pending_transactions,); diff --git a/client/blockchain-service/Cargo.toml b/client/blockchain-service/Cargo.toml index bdd360ef5..fede7f6a0 100644 --- a/client/blockchain-service/Cargo.toml +++ b/client/blockchain-service/Cargo.toml @@ -22,6 +22,7 @@ codec = { workspace = true } futures = { workspace = true } lazy-static = { workspace = true } log = { workspace = true } +reqwest = { workspace = true } rocksdb = { workspace = true } serde = { workspace = true, default-features = true } serde_json = { workspace = true } diff --git a/client/blockchain-service/src/commands.rs b/client/blockchain-service/src/commands.rs index d00833be2..2071221f4 100644 --- a/client/blockchain-service/src/commands.rs +++ b/client/blockchain-service/src/commands.rs @@ -214,6 +214,12 @@ pub enum BlockchainServiceCommand { /// - After extrinsic submission failures (may be transient) #[command(mode = "FireAndForget")] RemoveFileKeyStatus { file_key: FileKey }, + /// Get the current leader's advertised endpoints from the leadership database. + /// + /// Returns the leader's RPC URL and trusted file transfer server URL. + /// Returns None if no leader info is available or if leadership is not enabled. + #[command(success_type = Option, error_type = shc_blockchain_service_db::DbSetupError)] + GetLeaderInfo, } /// Interface for interacting with the BlockchainService actor. diff --git a/client/blockchain-service/src/events.rs b/client/blockchain-service/src/events.rs index 285c97041..a61ad3c1f 100644 --- a/client/blockchain-service/src/events.rs +++ b/client/blockchain-service/src/events.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use codec::{Decode, Encode}; use sc_network::Multiaddr; +use sp_core::H256; use tokio::sync::{oneshot, Mutex}; use shc_actors_derive::{ActorEvent, ActorEventBus}; @@ -385,6 +386,20 @@ pub struct DistributeFileToBsp { pub bsp_id: BackupStorageProviderId, } +/// Event emitted when a follower MSP needs to download a file key from the leader. +/// This event is emitted when a file key is added to the follower's download list. +#[derive(Debug, Clone, ActorEvent)] +#[actor(actor = "blockchain_service")] +pub struct FollowerFileKeyToDownload { + pub file_key: H256, +} + +/// Event emitted to trigger processing one iteration of follower downloads. +/// This event can be emitted periodically or when there are pending downloads. +#[derive(Debug, Clone, ActorEvent)] +#[actor(actor = "blockchain_service")] +pub struct ProcessFollowerDownloads {} + /// The event bus provider for the BlockchainService actor. /// /// It holds the event buses for the different events that the BlockchainService actor diff --git a/client/blockchain-service/src/handler.rs b/client/blockchain-service/src/handler.rs index 580e75307..f11975ed4 100644 --- a/client/blockchain-service/src/handler.rs +++ b/client/blockchain-service/src/handler.rs @@ -185,6 +185,10 @@ where pub enable_msp_distribute_files: bool, /// Optional Postgres URL for the pending transactions DB. If None, DB is disabled. pub pending_db_url: Option, + /// Advertised RPC URL for leader registration (optional). + pub advertised_rpc_url: Option, + /// Advertised trusted file transfer server URL for leader registration (optional). + pub advertised_trusted_file_transfer_server_url: Option, } impl Default for BlockchainServiceConfig @@ -198,6 +202,8 @@ where peer_id: None, enable_msp_distribute_files: false, pending_db_url: None, + advertised_rpc_url: None, + advertised_trusted_file_transfer_server_url: None, } } } @@ -1547,6 +1553,26 @@ where } } } + BlockchainServiceCommand::GetLeaderInfo { callback } => { + use shc_blockchain_service_db::leadership::get_leader_info; + + let result = match &self.leadership_conn { + Some(client) => get_leader_info(client).await, + None => { + // No leadership connection available, return None + Ok(None) + } + }; + + match callback.send(result) { + Ok(_) => { + trace!(target: LOG_TARGET, "Leader info sent successfully"); + } + Err(e) => { + error!(target: LOG_TARGET, "Failed to send leader info: {:?}", e); + } + } + } } // Record command completion diff --git a/client/blockchain-service/src/handler_msp.rs b/client/blockchain-service/src/handler_msp.rs index e2af5cda4..ef44c1130 100644 --- a/client/blockchain-service/src/handler_msp.rs +++ b/client/blockchain-service/src/handler_msp.rs @@ -29,9 +29,9 @@ use crate::{ DistributeFileToBsp, FinalisedBucketMovedAway, FinalisedBucketMutationsApplied, FinalisedMspStopStoringBucketInsolventUser, FinalisedMspStoppedStoringBucket, FinalisedStorageRequestRejected, ForestWriteLockTaskData, MoveBucketRequestedForMsp, - NewStorageRequest, ProcessMspRespondStoringRequest, ProcessMspRespondStoringRequestData, - ProcessStopStoringForInsolventUserRequest, ProcessStopStoringForInsolventUserRequestData, - StartMovedBucketDownload, + NewStorageRequest, ProcessFollowerDownloads, ProcessMspRespondStoringRequest, + ProcessMspRespondStoringRequestData, ProcessStopStoringForInsolventUserRequest, + ProcessStopStoringForInsolventUserRequestData, StartMovedBucketDownload, }, handler::LOG_TARGET, types::{FileDistributionInfo, FileKeyStatus, ManagedProvider, MultiInstancesNodeRole}, @@ -251,6 +251,7 @@ where /// 1. Monitor for new pending storage requests and emit events for processing. /// 2. Check for BSPs who volunteered for files this MSP has to distribute, and spawn task /// to distribute them. + /// 3. For follower nodes, emit ProcessFollowerDownloads event to trigger download attempts. pub(crate) async fn msp_end_block_processing( &mut self, block_hash: &Runtime::Hash, @@ -272,6 +273,11 @@ where // Distribute files to BSPs self.spawn_distribute_file_to_bsps_tasks(block_hash, managed_msp_id); + + // For follower nodes, emit ProcessFollowerDownloads every block + if matches!(self.role, MultiInstancesNodeRole::Follower) { + self.emit(ProcessFollowerDownloads {}); + } } /// Processes finality events that are only relevant for an MSP. diff --git a/client/blockchain-service/src/utils.rs b/client/blockchain-service/src/utils.rs index a715d3c8a..1d7ff1c8f 100644 --- a/client/blockchain-service/src/utils.rs +++ b/client/blockchain-service/src/utils.rs @@ -15,9 +15,12 @@ use sc_network::Multiaddr; use sc_transaction_pool_api::TransactionStatus; use shc_actors_framework::actor::Actor; use shc_blockchain_service_db::{ - leadership::{open_leadership_connection, try_acquire_leadership, LEADERSHIP_LOCK_KEY}, + leadership::{ + open_leadership_connection, try_acquire_leadership, update_leader_info, LEADERSHIP_LOCK_KEY, + }, setup_db_pool, store::PendingTxStore, + NodeAdvertisedEndpoints, }; use shc_common::{ blockchain_utils::{ @@ -48,8 +51,9 @@ use substrate_frame_rpc_system::AccountNonceApi; use crate::{ events::{ - AcceptedBspVolunteer, LastChargeableInfoUpdated, NewStorageRequest, NotifyPeriod, - SlashableProvider, SpStopStoringInsolventUser, UserWithoutFunds, + AcceptedBspVolunteer, FollowerFileKeyToDownload, LastChargeableInfoUpdated, + NewStorageRequest, NotifyPeriod, SlashableProvider, SpStopStoringInsolventUser, + UserWithoutFunds, }, handler::LOG_TARGET, state::LastProcessedBlockCf, @@ -141,9 +145,36 @@ where target: LOG_TARGET, "This node acquired the leadership advisory lock; running as LEADER" ); + + // Register this node's advertised endpoints as leader metadata so followers can find us + let endpoints = NodeAdvertisedEndpoints { + rpc_url: self.config.advertised_rpc_url.clone().expect("RPC URL should be set for leader registration"), + trusted_file_transfer_server_url: self.config.advertised_trusted_file_transfer_server_url.clone().expect("Trusted file transfer server URL should be set for leader registration"), + }; + + if let Err(e) = update_leader_info(&client, &endpoints).await { + // TODO: See how to handle this. One option is to wrap lock + // adquisition and info submission within a transaction (so lock + // is only given if info is submitted correctly), the + // other is a retry with backoff and dropping after N retries + error!( + target: LOG_TARGET, + "Failed to register leader info in database: {:?}. Followers may not be able to discover this leader.", + e + ); + } else { + info!( + target: LOG_TARGET, + "✅ Leader endpoints registered in database: rpc={}, trusted_file_transfer={}", + endpoints.rpc_url, + endpoints.trusted_file_transfer_server_url + ); + } + self.pending_tx_store = Some(PendingTxStore::new(pool)); self.leadership_conn = Some(client); self.role = MultiInstancesNodeRole::Leader; + info!(target: LOG_TARGET, "🗃️ Pending transactions store initialised"); } Ok(false) => { @@ -1881,6 +1912,19 @@ where })?; debug!(target: LOG_TARGET, "Inserted file keys: {:?}", inserted_file_keys); + + // MSP Follower: Track FileKeys to retrieve from Leader + if matches!(self.role, crate::types::MultiInstancesNodeRole::Follower) { + if let Some(crate::types::ManagedProvider::Msp(_msp_handler)) = + self.maybe_managed_provider.as_mut() + { + debug!(target: LOG_TARGET, "MSP Follower: Tracked file key {:x} for retrieval from Leader", file_key); + // Emit event to trigger download task + self.emit(FollowerFileKeyToDownload { + file_key: (*file_key).into(), + }); + } + } } TrieMutation::Remove(_) => { fs.write().await.delete_file_key(file_key).map_err(|e| { diff --git a/client/src/builder.rs b/client/src/builder.rs index 6a12b00a8..61f788190 100644 --- a/client/src/builder.rs +++ b/client/src/builder.rs @@ -237,7 +237,6 @@ where } } } - /// Spawn the Fisherman Service. /// /// The Fisherman Service monitors the blockchain for file deletion requests @@ -890,6 +889,13 @@ pub struct BlockchainServiceOptions { pub enable_msp_distribute_files: Option, /// Postgres database URL for pending transactions persistence. If not provided, pending transactions will not be persisted. pub pending_db_url: Option, + /// Advertised RPC URL for leader registration. If not set, will be derived from actual RPC server configuration. + /// Example: "ws://192.168.1.100:9944" + pub advertised_rpc_url: Option, + /// Advertised trusted file transfer server URL for leader registration. + /// If not set, will be derived from actual trusted file transfer server configuration. + /// Example: "http://192.168.1.100:7070" + pub advertised_trusted_file_transfer_server_url: Option, } impl Into> @@ -910,6 +916,9 @@ impl Into> peer_id, enable_msp_distribute_files: self.enable_msp_distribute_files.unwrap_or(false), pending_db_url: self.pending_db_url, + advertised_rpc_url: self.advertised_rpc_url, + advertised_trusted_file_transfer_server_url: self + .advertised_trusted_file_transfer_server_url, } } } diff --git a/client/src/handler.rs b/client/src/handler.rs index 642567dee..22a8384f8 100644 --- a/client/src/handler.rs +++ b/client/src/handler.rs @@ -17,12 +17,12 @@ use shc_blockchain_service::{ FinalisedBucketMovedAway, FinalisedBucketMutationsApplied, FinalisedMspStopStoringBucketInsolventUser, FinalisedMspStoppedStoringBucket, FinalisedStorageRequestRejected, FinalisedTrieRemoveMutationsAppliedForBsp, - LastChargeableInfoUpdated, MoveBucketAccepted, MoveBucketExpired, MoveBucketRejected, - MoveBucketRequested, MoveBucketRequestedForMsp, MultipleNewChallengeSeeds, - NewStorageRequest, NotifyPeriod, ProcessConfirmStoringRequest, - ProcessMspRespondStoringRequest, ProcessStopStoringForInsolventUserRequest, - ProcessSubmitProofRequest, SlashableProvider, SpStopStoringInsolventUser, - StartMovedBucketDownload, UserWithoutFunds, + FollowerFileKeyToDownload, LastChargeableInfoUpdated, MoveBucketAccepted, + MoveBucketExpired, MoveBucketRejected, MoveBucketRequested, MoveBucketRequestedForMsp, + MultipleNewChallengeSeeds, NewStorageRequest, NotifyPeriod, + ProcessConfirmStoringRequest, ProcessFollowerDownloads, ProcessMspRespondStoringRequest, + ProcessStopStoringForInsolventUserRequest, ProcessSubmitProofRequest, SlashableProvider, + SpStopStoringInsolventUser, StartMovedBucketDownload, UserWithoutFunds, }, handler::BlockchainServiceConfig, BlockchainService, @@ -52,6 +52,7 @@ use crate::{ msp_delete_bucket::MspDeleteBucketTask, msp_delete_file::MspDeleteFileTask, msp_distribute_file::MspDistributeFileTask, + msp_follower_download_file::MspFollowerDownloadFileTask, msp_move_bucket::{MspMoveBucketConfig, MspRespondMoveBucketTask}, msp_retry_bucket_move::MspRetryBucketMoveTask, msp_stop_storing_insolvent_user::MspStopStoringInsolventUserTask, @@ -336,6 +337,11 @@ where MspStopStoringInsolventUserTask, NotifyPeriod => MspChargeFeesTask, DistributeFileToBsp => MspDistributeFileTask, + // MspFollowerDownloadFileTask handles file downloads for MSP followers: + // - FollowerFileKeyToDownload events are emitted when a file key needs to be downloaded. + // - ProcessFollowerDownloads events trigger processing one download iteration. + FollowerFileKeyToDownload => MspFollowerDownloadFileTask, + ProcessFollowerDownloads => MspFollowerDownloadFileTask, // MspRemoveFinalisedFilesTask handles events for removing files from file storage after mutations are finalised. FinalisedBucketMutationsApplied => MspDeleteFileTask, FinalisedStorageRequestRejected => MspDeleteFileTask, diff --git a/client/src/tasks/mod.rs b/client/src/tasks/mod.rs index 5b0f4d358..34f5afd89 100644 --- a/client/src/tasks/mod.rs +++ b/client/src/tasks/mod.rs @@ -11,6 +11,7 @@ pub mod msp_charge_fees; pub mod msp_delete_bucket; pub mod msp_delete_file; pub mod msp_distribute_file; +pub mod msp_follower_download_file; pub mod msp_move_bucket; pub mod msp_retry_bucket_move; pub mod msp_stop_storing_insolvent_user; diff --git a/client/src/tasks/msp_follower_download_file.rs b/client/src/tasks/msp_follower_download_file.rs new file mode 100644 index 000000000..d111eca13 --- /dev/null +++ b/client/src/tasks/msp_follower_download_file.rs @@ -0,0 +1,235 @@ +//! # MSP Follower Download File Task +//! +//! This module handles the file download flow for MSP Followers. +//! +//! ### Event Handlers +//! +//! - [`FollowerFileKeyToDownload`]: Emitted when a file key needs to be downloaded. +//! The handler adds the file key to the internal download list. +//! +//! - [`ProcessFollowerDownloads`]: Emitted every block to process pending downloads. +//! The handler attempts to download each file once per block. Failed downloads remain +//! in the queue and will be retried on the next block. + +use anyhow::anyhow; +use std::collections::HashSet; +use std::sync::Arc; + +use sc_tracing::tracing::*; +use shc_actors_framework::event_bus::EventHandler; +use shc_blockchain_service::{ + commands::BlockchainServiceCommandInterface, + events::{FollowerFileKeyToDownload, ProcessFollowerDownloads}, +}; +use shc_common::traits::StorageEnableRuntime; +use sp_core::H256; +use tokio::sync::RwLock; + +use crate::{ + handler::StorageHubHandler, + trusted_file_transfer::utils::download_file_from_peer, + types::{MspForestStorageHandlerT, ShNodeType}, +}; + +const LOG_TARGET: &str = "msp-follower-download-file-task"; + +/// Handles the file download flow for MSP Followers. +/// +/// This task processes events to download files from the leader MSP. +/// See [module documentation](self) for the full architecture and event flow diagram. +pub struct MspFollowerDownloadFileTask +where + NT: ShNodeType, + NT::FSH: MspForestStorageHandlerT, + Runtime: StorageEnableRuntime, +{ + storage_hub_handler: StorageHubHandler, + /// Internal list of file keys to download + file_keys_to_download: Arc>>, +} + +impl Clone for MspFollowerDownloadFileTask +where + NT: ShNodeType, + NT::FSH: MspForestStorageHandlerT, + Runtime: StorageEnableRuntime, +{ + fn clone(&self) -> MspFollowerDownloadFileTask { + Self { + storage_hub_handler: self.storage_hub_handler.clone(), + file_keys_to_download: self.file_keys_to_download.clone(), + } + } +} + +impl MspFollowerDownloadFileTask +where + NT: ShNodeType, + NT::FSH: MspForestStorageHandlerT, + Runtime: StorageEnableRuntime, +{ + pub fn new(storage_hub_handler: StorageHubHandler) -> Self { + Self { + storage_hub_handler, + file_keys_to_download: Arc::new(RwLock::new(HashSet::new())), + } + } +} + +/// Handles the [`FollowerFileKeyToDownload`] event. +/// +/// This event is emitted when a file key needs to be downloaded from the leader. +/// The handler adds the file key to the internal download list. +impl EventHandler + for MspFollowerDownloadFileTask +where + NT: ShNodeType + 'static, + NT::FSH: MspForestStorageHandlerT, + Runtime: StorageEnableRuntime, +{ + async fn handle_event(&mut self, event: FollowerFileKeyToDownload) -> anyhow::Result { + let file_key_hash = event.file_key; + + info!( + target: LOG_TARGET, + "Adding file key [{:x}] to download list", + file_key_hash + ); + + // Add file key to the download list + { + let mut keys = self.file_keys_to_download.write().await; + keys.insert(file_key_hash); + } + + Ok(format!( + "Added file key [{:x}] to download list", + file_key_hash + )) + } +} + +/// Handles the [`ProcessFollowerDownloads`] event. +/// +/// This event triggers processing all pending downloads. +/// The handler processes all files from the download list, continuing even if individual +/// downloads fail. Only successfully downloaded files are removed from the list. +/// Failed downloads remain in the list and will be retried on the next block. +impl EventHandler + for MspFollowerDownloadFileTask +where + NT: ShNodeType + 'static, + NT::FSH: MspForestStorageHandlerT, + Runtime: StorageEnableRuntime, +{ + async fn handle_event(&mut self, _event: ProcessFollowerDownloads) -> anyhow::Result { + // Get a snapshot of all file keys to download + let file_keys_to_download: Vec = { + let keys = self.file_keys_to_download.read().await; + if keys.is_empty() { + trace!( + target: LOG_TARGET, + "No files to download from leader" + ); + return Ok("No files to download".to_string()); + } + keys.iter().cloned().collect() + }; + + info!( + target: LOG_TARGET, + count = file_keys_to_download.len(), + "Processing {} files to download from peer", + file_keys_to_download.len() + ); + + // Get peer URL from blockchain service leadership DB + // Note: In the follower scenario, this is the leader's URL, but the function + // is generic and can download from any peer + let peer_url = match self + .storage_hub_handler + .blockchain + .get_leader_info() + .await? + { + Some(endpoints) => endpoints.trusted_file_transfer_server_url, + None => { + error!( + target: LOG_TARGET, + "No leader info found in database. Leader may not be available yet." + ); + return Err(anyhow!( + "No leader info found in database. Leader may not be available yet." + )); + } + }; + + let mut success_count = 0; + let mut error_count = 0; + let mut last_error: Option = None; + + // Process each file once per event + for file_key in file_keys_to_download { + info!( + target: LOG_TARGET, + file_key = %file_key, + "Attempting to download file from peer" + ); + + // Download the file from the peer + match download_file_from_peer( + &peer_url, + &file_key, + &self.storage_hub_handler.file_storage, + ) + .await + { + Ok(()) => { + // Remove from download list on success + let mut keys = self.file_keys_to_download.write().await; + keys.remove(&file_key); + success_count += 1; + info!( + target: LOG_TARGET, + file_key = %file_key, + "Successfully downloaded file from peer" + ); + } + Err(e) => { + error_count += 1; + warn!( + target: LOG_TARGET, + file_key = %file_key, + error = %e, + "Failed to download file from peer, will retry on next block" + ); + last_error = Some(e); + // Keep the file in the download list for retry on next block + } + } + } + + // Return summary + if error_count > 0 { + warn!( + target: LOG_TARGET, + success_count = success_count, + error_count = error_count, + "Completed download processing with some errors" + ); + Err(last_error.unwrap_or_else(|| anyhow!("Unknown error occurred"))) + } else { + info!( + target: LOG_TARGET, + success_count = success_count, + "Successfully processed all downloads" + ); + Ok(format!( + "Processed {} files: {} successful, {} errors", + success_count + error_count, + success_count, + error_count + )) + } + } +} diff --git a/client/src/trusted_file_transfer/files.rs b/client/src/trusted_file_transfer/files.rs index ce7d6bae9..2a42fa5e2 100644 --- a/client/src/trusted_file_transfer/files.rs +++ b/client/src/trusted_file_transfer/files.rs @@ -1,6 +1,6 @@ //! File encoding/decoding utilities -use axum::body::Body; +use futures::Stream; use log::{error, info}; use shc_common::{ trusted_file_transfer::{read_chunk_with_id_from_buffer, CHUNK_ID_SIZE}, @@ -14,28 +14,39 @@ use tokio_stream::StreamExt; use crate::{trusted_file_transfer::server::LOG_TARGET, types::FileStorageT}; -/// Get chunks from a request body as a stream and write them to storage -pub(crate) async fn process_chunk_stream( +/// Get chunks from a stream and write them to storage. +/// +/// This generic function works with any stream that yields `Result`. +/// The stream format is: +/// [ChunkId: 8 bytes (u64, little-endian)][Chunk data: FILE_CHUNK_SIZE bytes]... +/// [ChunkId: 8 bytes (u64, little-endian)][Chunk data: remaining bytes for last chunk] +/// +/// # Type Parameters +/// * `FL` - File storage type implementing `FileStorageT` +/// * `S` - Stream type that yields byte chunks +/// * `E` - Error type from the stream that can be converted to `anyhow::Error` +pub(crate) async fn process_chunk_stream( file_storage: &RwLock, file_key: &sp_core::H256, - request_body: Body, + mut stream: S, ) -> anyhow::Result<()> where FL: FileStorageT, + S: Stream> + Unpin, + E: Into, { - let mut request_stream = request_body.into_data_stream(); let mut buffer = Vec::new(); let mut last_write_outcome = FileStorageWriteOutcome::FileIncomplete; - // Process request stream, storing chunks as they are received - while let Some(try_bytes) = request_stream.next().await { - let bytes = try_bytes?; + // Process stream, storing chunks as they are received + while let Some(try_bytes) = stream.next().await { + let bytes = try_bytes.map_err(|e| e.into())?; buffer.extend_from_slice(&bytes); while buffer.len() >= CHUNK_ID_SIZE + (FILE_CHUNK_SIZE as usize) { // Here we call with cap_at_file_chunk_size = true because we want to read chunk by chunk. // If there are remaining bytes in the buffer, they could belong to half a chunk that will be - // filled in the next iteration of the `while let Some(try_bytes) = request_stream.next().await` loop. + // filled in the next iteration of the `while let Some(try_bytes) = stream.next().await` loop. let (chunk_id, chunk_data) = read_chunk_with_id_from_buffer(&mut buffer, true)?; last_write_outcome = write_chunk(file_storage, file_key, &chunk_id, &chunk_data).await?; @@ -96,6 +107,7 @@ where #[cfg(test)] mod tests { use super::*; + use axum::body::Body; use shc_common::{ trusted_file_transfer::encode_chunk_with_id, types::{FileMetadata, StorageProofsMerkleTrieLayout}, @@ -144,7 +156,8 @@ mod tests { file_storage.insert_file(file_key, metadata).unwrap(); let file_storage = Arc::new(RwLock::new(file_storage)); - process_chunk_stream(&file_storage, &file_key, body) + let stream = body.into_data_stream(); + process_chunk_stream(&file_storage, &file_key, stream) .await .unwrap(); @@ -205,7 +218,8 @@ mod tests { file_storage.insert_file(file_key, metadata).unwrap(); let file_storage = Arc::new(RwLock::new(file_storage)); - process_chunk_stream(&file_storage, &file_key, body) + let stream = body.into_data_stream(); + process_chunk_stream(&file_storage, &file_key, stream) .await .unwrap(); diff --git a/client/src/trusted_file_transfer/mod.rs b/client/src/trusted_file_transfer/mod.rs index bf80efce3..931b56659 100644 --- a/client/src/trusted_file_transfer/mod.rs +++ b/client/src/trusted_file_transfer/mod.rs @@ -4,3 +4,4 @@ pub mod files; pub mod server; +pub mod utils; diff --git a/client/src/trusted_file_transfer/server.rs b/client/src/trusted_file_transfer/server.rs index af456ad19..7e2cc4735 100644 --- a/client/src/trusted_file_transfer/server.rs +++ b/client/src/trusted_file_transfer/server.rs @@ -7,7 +7,7 @@ use axum::{ extract::{DefaultBodyLimit, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::post, + routing::{get, post}, Router, }; use futures::FutureExt; @@ -18,12 +18,16 @@ use shc_blockchain_service::{ types::{MspRespondStorageRequest, RespondStorageRequest}, BlockchainService, }; -use shc_common::traits::StorageEnableRuntime; +use shc_common::{traits::StorageEnableRuntime, types::ChunkId}; use shc_file_transfer_service::{ commands::FileTransferServiceCommandInterface, FileTransferService, }; use shc_forest_manager::traits::ForestStorageHandler; -use tokio::{net::TcpListener, sync::RwLock}; +use tokio::{ + net::TcpListener, + sync::{mpsc, RwLock}, +}; +use tokio_stream::wrappers::ReceiverStream; use crate::{trusted_file_transfer::files::process_chunk_stream, types::FileStorageT}; @@ -109,6 +113,7 @@ where let app = Router::new() .route("/upload/{file_key}", post(upload_file)) + .route("/download/{file_key}", get(download_file)) .route_layer(DefaultBodyLimit::disable()) .with_state(context); @@ -224,7 +229,8 @@ where let file_key_hash = sp_core::H256::from_slice(&file_key_bytes); // Process the streamed chunks - match process_chunk_stream(&context.file_storage, &file_key_hash, body).await { + let stream = body.into_data_stream(); + match process_chunk_stream(&context.file_storage, &file_key_hash, stream).await { Ok(_) => { if let Err(e) = handle_file_complete(&context, &file_key_hash).await { return ( @@ -274,3 +280,209 @@ where .await; Ok(()) } + +/// HTTP endpoint handler for downloading a file as chunks +/// +/// The stream format is: +/// [ChunkId: 8 bytes (u64, little-endian)][Chunk data: FILE_CHUNK_SIZE bytes]... +/// [ChunkId: 8 bytes (u64, little-endian)][Chunk data: remaining bytes for last chunk] +async fn download_file( + State(context): State>, + Path(file_key): Path, +) -> Response +where + FL: FileStorageT, + FSH: ForestStorageHandler + Clone + Send + Sync + 'static, + Runtime: StorageEnableRuntime, +{ + let result = AssertUnwindSafe(async { download_file_inner(context, file_key).await }) + .catch_unwind() + .await; + + match result { + Ok(response) => response, + Err(panic_info) => { + let panic_msg = if let Some(s) = panic_info.downcast_ref::() { + s.clone() + } else if let Some(s) = panic_info.downcast_ref::<&str>() { + s.to_string() + } else { + "Unknown panic".to_string() + }; + + error!( + target: LOG_TARGET, + panic = %panic_msg, + "Panic caught in trusted file transfer download handler" + ); + + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Internal server error: handler panicked".to_string(), + ) + .into_response() + } + } +} + +async fn download_file_inner( + context: Context, + file_key_str: String, +) -> Response +where + FL: FileStorageT, + FSH: ForestStorageHandler + Clone + Send + Sync + 'static, + Runtime: StorageEnableRuntime, +{ + // Parse file key from hex string + let file_key = match sp_core::H256::from_slice(&hex::decode(&file_key_str).unwrap_or_default()) + { + key if key != sp_core::H256::zero() => key, + _ => { + warn!( + target: LOG_TARGET, + file_key = %file_key_str, + "Invalid file key hex format" + ); + return ( + StatusCode::BAD_REQUEST, + format!("Invalid file key format: {}", file_key_str), + ) + .into_response(); + } + }; + + info!( + target: LOG_TARGET, + file_key = %file_key, + "Download request received for file" + ); + + // Get file metadata to determine chunk count + let file_storage = context.file_storage.read().await; + let metadata = match file_storage.get_metadata(&file_key) { + Ok(Some(metadata)) => metadata, + Ok(None) => { + warn!( + target: LOG_TARGET, + file_key = %file_key, + "File not found" + ); + return ( + StatusCode::NOT_FOUND, + format!("File not found: {:x}", file_key), + ) + .into_response(); + } + Err(e) => { + error!( + target: LOG_TARGET, + file_key = %file_key, + error = %e, + "Failed to get file metadata" + ); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to get file metadata: {}", e), + ) + .into_response(); + } + }; + + let chunks_count = metadata.chunks_count(); + + // Release the read lock before starting the streaming operation + drop(file_storage); + + info!( + target: LOG_TARGET, + file_key = %file_key, + chunks_count = chunks_count, + "Streaming file chunks" + ); + + // Create a bounded channel for streaming chunks + // Buffer size: 1024 chunks = ~1MB in memory + // TODO: This logic is quite similar to save_file_to_disk. Maybe there's room + // for removing duplication or replace the backend dowload flow with one + // that uses this endpoint + const QUEUE_BUFFER_SIZE: usize = 1024; + let (tx, rx) = mpsc::channel::>(QUEUE_BUFFER_SIZE); + + let file_storage_arc = context.file_storage.clone(); + let batch_size = QUEUE_BUFFER_SIZE as u64; + + // Spawn producer task to read chunks from storage and send through channel + tokio::spawn(async move { + let mut current_chunk: u64 = 0; + + while current_chunk < chunks_count { + let batch_end = std::cmp::min(chunks_count, current_chunk.saturating_add(batch_size)); + + // Read a batch of chunks under a single read lock + let mut batch = Vec::with_capacity((batch_end - current_chunk) as usize); + { + let read_storage = file_storage_arc.read().await; + for chunk_idx in current_chunk..batch_end { + let chunk_id = ChunkId::new(chunk_idx); + + match read_storage.get_chunk(&file_key, &chunk_id) { + Ok(chunk_data) => { + // Encode chunk ID as little-endian u64 + let chunk_id_bytes = chunk_idx.to_le_bytes(); + batch.push((chunk_id_bytes, chunk_data)); + } + Err(e) => { + error!( + target: LOG_TARGET, + file_key = %file_key, + chunk_id = chunk_idx, + error = %e, + "Failed to get chunk" + ); + // Send error and stop producing + let _ = tx + .send(Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error reading chunk {}: {:?}", chunk_idx, e), + ))) + .await; + return; + } + } + } + } // Read lock released here + + // Send the batch, backpressure ensured by bounded channel + for (chunk_id_bytes, chunk_data) in batch { + // Send chunk ID bytes + if tx + .send(Ok(bytes::Bytes::from(chunk_id_bytes.to_vec()))) + .await + .is_err() + { + // Consumer dropped (client disconnected) - stop producing + return; + } + + // Send chunk data bytes + if tx.send(Ok(bytes::Bytes::from(chunk_data))).await.is_err() { + // Consumer dropped (client disconnected) - stop producing + return; + } + } + + current_chunk = batch_end; + } + }); + + // Convert channel receiver to stream and wrap in response body + let stream = ReceiverStream::new(rx); + let body = Body::from_stream(stream); + + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/octet-stream") + .body(body) + .unwrap() +} diff --git a/client/src/trusted_file_transfer/utils.rs b/client/src/trusted_file_transfer/utils.rs new file mode 100644 index 000000000..c45e2df18 --- /dev/null +++ b/client/src/trusted_file_transfer/utils.rs @@ -0,0 +1,74 @@ +//! Trusted File Transfer Utilities +//! +//! This module contains utility functions for downloading files from peers +//! via the trusted file transfer server. + +use anyhow::{anyhow, Result}; +use sc_tracing::tracing::info; +use shc_file_manager::traits::FileStorage; +use sp_core::H256; + +use crate::{trusted_file_transfer::files::process_chunk_stream, types::FileStorageT}; + +const LOG_TARGET: &str = "trusted-file-transfer-utils"; + +/// Downloads a file from a peer's trusted file transfer server and writes it to local storage. +/// +/// # Arguments +/// * `peer_url` - The base URL of the peer's trusted file transfer server (e.g., "http://192.168.1.100:7070") +/// * `file_key` - The file key to download +/// * `file_storage` - The file storage to write chunks to +/// +/// # Returns +/// * `Ok(())` if the file was downloaded and stored successfully +/// * `Err` if there was an error downloading or storing the file +pub async fn download_file_from_peer( + peer_url: &str, + file_key: &H256, + file_storage: &tokio::sync::RwLock, +) -> Result<()> +where + FL: FileStorage + Send + Sync, +{ + info!( + target: LOG_TARGET, + file_key = %file_key, + peer_url = %peer_url, + "Downloading file from peer" + ); + + // Build the download URL + let download_url = format!("{}/download/{:x}", peer_url.trim_end_matches('/'), file_key); + + // Make HTTP GET request to download the file + let client = reqwest::Client::new(); + let response = client + .get(&download_url) + .send() + .await + .map_err(|e| anyhow!("Failed to send download request: {}", e))?; + + if !response.status().is_success() { + let status = response.status(); + let error_body = response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(anyhow!( + "Download request failed with status {}: {}", + status, + error_body + )); + } + + let bytes_stream = response.bytes_stream(); + process_chunk_stream(file_storage, file_key, bytes_stream).await?; + + info!( + target: LOG_TARGET, + file_key = %file_key, + "Successfully downloaded and stored file from peer" + ); + + Ok(()) +} diff --git a/node/src/service.rs b/node/src/service.rs index 7b734dcbc..2407891f8 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -345,20 +345,32 @@ where } } - if *trusted_file_transfer_server { + // Prepare trusted file transfer server URL for leader registration + let trusted_file_transfer_url = if *trusted_file_transfer_server { let file_transfer_config = shc_client::trusted_file_transfer::server::Config { host: trusted_file_transfer_server_host .clone() .unwrap_or_else(|| "127.0.0.1".to_string()), port: trusted_file_transfer_server_port.unwrap_or(7070), }; - builder.with_trusted_file_transfer_server(file_transfer_config); - } + builder.with_trusted_file_transfer_server(file_transfer_config.clone()); + Some(format!( + "http://{}:{}", + file_transfer_config.host, file_transfer_config.port + )) + } else { + None + }; if let Some(c) = blockchain_service { let peer_id = network.local_peer_id().to_bytes(); let mut c = c.clone(); c.peer_id = Some(peer_id); + + if c.advertised_trusted_file_transfer_server_url.is_none() { + c.advertised_trusted_file_transfer_server_url = trusted_file_transfer_url; + } + builder.with_blockchain_service_config(c); }