diff --git a/Cargo.lock b/Cargo.lock index 2f668140c..91e238a0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11963,6 +11963,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "typed-store", + "urlencoding", "utoipa", "utoipa-redoc", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 0df4dfef6..8bcb1359e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ tracing-opentelemetry = { version = "=0.28.0", default-features = false } tracing-subscriber = { version = "0.3.19", default-features = false, features = ["fmt"] } typed-store = { git = "https://github.com/MystenLabs/sui", tag = "testnet-v1.45.2" } url = "2.5.4" +urlencoding = "2.1.3" utoipa = { version = "5" } utoipa-redoc = { version = "6.0", features = ["axum"] } uuid = { version = "1.16.0", features = ["fast-rng", "macro-diagnostics", "v7"] } diff --git a/crates/walrus-service/Cargo.toml b/crates/walrus-service/Cargo.toml index 71d55ef5a..5c210ee22 100644 --- a/crates/walrus-service/Cargo.toml +++ b/crates/walrus-service/Cargo.toml @@ -150,6 +150,7 @@ tracing.workspace = true tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true typed-store = { workspace = true, optional = true } +urlencoding.workspace = true utoipa = { workspace = true, features = ["axum_extras", "macros", "yaml"] } utoipa-redoc.workspace = true uuid.workspace = true diff --git a/crates/walrus-service/bin/node.rs b/crates/walrus-service/bin/node.rs index c2406c9b5..e17353ddf 100644 --- a/crates/walrus-service/bin/node.rs +++ b/crates/walrus-service/bin/node.rs @@ -1152,7 +1152,11 @@ impl StorageNodeRuntime { StorageNode::builder() .with_system_event_manager(event_manager) .with_config_loader(config_loader) - .build(node_config, metrics_runtime.registry.clone()), + .build( + node_config, + metrics_runtime.registry.clone(), + metrics_runtime.tracing_handle.clone(), + ), )?, ); diff --git a/crates/walrus-service/src/common/telemetry.rs b/crates/walrus-service/src/common/telemetry.rs index 09c7cdb4e..7a2ba68bf 100644 --- a/crates/walrus-service/src/common/telemetry.rs +++ b/crates/walrus-service/src/common/telemetry.rs @@ -37,6 +37,7 @@ use prometheus::{ IntGaugeVec, Opts, }; +use telemetry_subscribers::TracingHandle; use tokio::time::Instant; use tower_http::trace::{MakeSpan, OnResponse}; use tracing::{field, Span}; @@ -565,3 +566,24 @@ impl From for Box { Box::new(value.0) } } + +pub struct WalrusTracingHandle(pub Arc); + +impl std::ops::Deref for WalrusTracingHandle { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for Arc { + fn from(value: WalrusTracingHandle) -> Self { + value.0 + } +} + +impl std::fmt::Debug for WalrusTracingHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "WalrusTracingHandle") + } +} diff --git a/crates/walrus-service/src/common/utils.rs b/crates/walrus-service/src/common/utils.rs index 31db7f6c2..7c8c5ccc8 100644 --- a/crates/walrus-service/src/common/utils.rs +++ b/crates/walrus-service/src/common/utils.rs @@ -251,7 +251,8 @@ pub struct MetricsAndLoggingRuntime { /// The Prometheus registry. pub registry: Registry, _telemetry_guards: TelemetryGuards, - _tracing_handle: TracingHandle, + /// The tracing handle. + pub tracing_handle: Arc, /// The runtime for metrics and logging. // INV: Runtime must be dropped last. pub runtime: Option, @@ -291,7 +292,7 @@ impl MetricsAndLoggingRuntime { runtime, registry: walrus_registry, _telemetry_guards: telemetry_guards, - _tracing_handle: tracing_handle, + tracing_handle: Arc::new(tracing_handle), }) } } diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index 848b8a46e..de8d922f3 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -5,6 +5,7 @@ use std::{ future::Future, + net::SocketAddr, num::{NonZero, NonZeroU16}, pin::Pin, str::FromStr, @@ -45,6 +46,7 @@ use sui_macros::fail_point_if; use sui_macros::{fail_point_arg, fail_point_async}; use sui_types::{base_types::ObjectID, event::EventID}; use system_events::{CompletableHandle, EventHandle}; +use telemetry_subscribers::TracingHandle; use thread_pool::ThreadPoolBuilder; use tokio::{select, sync::watch, time::Instant}; use tokio_util::sync::CancellationToken; @@ -155,6 +157,7 @@ use crate::{ common::{ active_committees::ActiveCommittees, config::SuiConfig, + telemetry::WalrusTracingHandle, utils::should_reposition_cursor, }, utils::ShardDiffCalculator, @@ -276,6 +279,9 @@ pub trait ServiceState { /// Returns the node health information of this ServiceState. fn health_info(&self, detailed: bool) -> ServiceHealthInfo; + /// Returns the server's bound address. + fn rest_api_address(&self) -> SocketAddr; + /// Returns whether the sliver is stored in the shard. fn sliver_status( &self, @@ -289,6 +295,9 @@ pub trait ServiceState { public_key: PublicKey, signed_request: SignedSyncShardRequest, ) -> impl Future> + Send; + + /// Updates the log directive for the node. + fn update_log_directive>(&self, directive: S) -> Result<(), anyhow::Error>; } /// Builder to construct a [`StorageNode`]. @@ -367,6 +376,7 @@ impl StorageNodeBuilder { self, config: &StorageNodeConfig, metrics_registry: Registry, + tracing_handle: Arc, ) -> Result { let protocol_key_pair = config .protocol_key_pair @@ -459,15 +469,16 @@ impl StorageNodeBuilder { num_checkpoints_per_blob: self.num_checkpoints_per_blob, }; - StorageNode::new( + StorageNode::new(StorageNodeArgs { config, event_manager, committee_service, contract_service, - &metrics_registry, - self.config_loader, + registry: &metrics_registry, + tracing_handle: WalrusTracingHandle(tracing_handle), + config_loader: self.config_loader, node_params, - ) + }) .await } } @@ -508,6 +519,8 @@ pub struct StorageNodeInner { node_capability: ObjectID, blob_retirement_notifier: Arc, symbol_service: RecoverySymbolService, + tracing_handle: WalrusTracingHandle, + rest_api_address: SocketAddr, } /// Parameters for configuring and initializing a node. @@ -523,15 +536,39 @@ pub struct NodeParameters { num_checkpoints_per_blob: Option, } +/// Arguments for creating a new storage node. +#[derive(Debug)] +pub struct StorageNodeArgs<'a> { + /// Configuration for the storage node. + pub config: &'a StorageNodeConfig, + /// Event manager for handling system events. + pub event_manager: Box, + /// Service for managing committee-related operations. + pub committee_service: Arc, + /// Service for managing system contracts. + pub contract_service: Arc, + /// Prometheus registry for metrics. + pub registry: &'a Registry, + /// Handle for tracing operations. + pub tracing_handle: WalrusTracingHandle, + /// Optional configuration loader. + pub config_loader: Option>, + /// Additional node parameters. + pub node_params: NodeParameters, +} + impl StorageNode { async fn new( - config: &StorageNodeConfig, - event_manager: Box, - committee_service: Arc, - contract_service: Arc, - registry: &Registry, - config_loader: Option>, - node_params: NodeParameters, + StorageNodeArgs { + config, + event_manager, + committee_service, + contract_service, + registry, + tracing_handle, + config_loader, + node_params, + }: StorageNodeArgs<'_>, ) -> Result { let start_time = Instant::now(); let node_capability = contract_service @@ -600,6 +637,8 @@ impl StorageNode { .build_bounded(), ), encoding_config, + tracing_handle, + rest_api_address: config.rest_api_address, }); blocklist.start_refresh_task(); @@ -2111,6 +2150,10 @@ impl ServiceState for StorageNode { self.inner.health_info(detailed) } + fn rest_api_address(&self) -> SocketAddr { + self.inner.rest_api_address() + } + fn sliver_status( &self, blob_id: &BlobId, @@ -2126,6 +2169,10 @@ impl ServiceState for StorageNode { ) -> impl Future> + Send { self.inner.sync_shard(public_key, signed_request) } + + fn update_log_directive>(&self, directive: S) -> Result<(), anyhow::Error> { + self.inner.update_log_directive(directive) + } } impl ServiceState for StorageNodeInner { @@ -2526,6 +2573,17 @@ impl ServiceState for StorageNodeInner { .handle_sync_shard_request(request, self.current_epoch()) .await } + + fn update_log_directive>(&self, directive: S) -> Result<(), anyhow::Error> { + self.tracing_handle + .update_log(directive) + .map_err(|e| anyhow::anyhow!("{}", e))?; + Ok(()) + } + + fn rest_api_address(&self) -> SocketAddr { + self.rest_api_address + } } #[tracing::instrument(skip_all, err)] diff --git a/crates/walrus-service/src/node/server.rs b/crates/walrus-service/src/node/server.rs index 1dbcbab47..8d745e712 100644 --- a/crates/walrus-service/src/node/server.rs +++ b/crates/walrus-service/src/node/server.rs @@ -386,6 +386,10 @@ where .route(routes::BLOB_STATUS_ENDPOINT, get(routes::get_blob_status)) .route(routes::HEALTH_ENDPOINT, get(routes::health_info)) .route(routes::SYNC_SHARD_ENDPOINT, post(routes::sync_shard)) + .route( + routes::LOG_DIRECTIVE_ENDPOINT, + post(routes::set_log_directive), + ) } } @@ -424,6 +428,8 @@ fn to_pkcs8_key_pair(keypair: &NetworkKeyPair) -> RcGenKeyPair { #[cfg(test)] mod tests { + use std::net::{IpAddr, Ipv4Addr}; + use anyhow::anyhow; use axum::http::StatusCode; use fastcrypto::traits::KeyPair; @@ -689,6 +695,14 @@ mod tests { ) -> Result { Ok(SyncShardResponse::V1(vec![])) } + + fn rest_api_address(&self) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080) + } + + fn update_log_directive>(&self, _directive: S) -> Result<(), anyhow::Error> { + Ok(()) + } } async fn start_rest_api_with_config( diff --git a/crates/walrus-service/src/node/server/openapi.rs b/crates/walrus-service/src/node/server/openapi.rs index d9b447b2d..31abc0a2f 100644 --- a/crates/walrus-service/src/node/server/openapi.rs +++ b/crates/walrus-service/src/node/server/openapi.rs @@ -19,7 +19,7 @@ pub(super) const GROUP_READING_BLOBS: &str = "Reading Blobs"; pub(super) const GROUP_RECOVERY: &str = "Recovery"; pub(super) const GROUP_STATUS: &str = "Status"; pub(super) const GROUP_SYNC_SHARD: &str = "Sync Shard"; - +pub(super) const GROUP_LOG_DIRECTIVE: &str = "Log Directive"; #[derive(utoipa::OpenApi)] #[openapi( paths( @@ -34,6 +34,7 @@ pub(super) const GROUP_SYNC_SHARD: &str = "Sync Shard"; routes::list_recovery_symbols, routes::put_metadata, routes::put_sliver, + routes::set_log_directive, ), components(schemas( EpochSchema, diff --git a/crates/walrus-service/src/node/server/routes.rs b/crates/walrus-service/src/node/server/routes.rs index b681e6e8f..377c2aa24 100644 --- a/crates/walrus-service/src/node/server/routes.rs +++ b/crates/walrus-service/src/node/server/routes.rs @@ -1,10 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{num::NonZeroU16, sync::Arc}; +use std::{net::SocketAddr, num::NonZeroU16, sync::Arc}; use axum::{ - extract::{Path, Query, State}, + extract::{ConnectInfo, Path, Query, State}, http::StatusCode, response::{IntoResponse, Response}, }; @@ -90,6 +90,7 @@ pub const INCONSISTENCY_PROOF_ENDPOINT: &str = pub const BLOB_STATUS_ENDPOINT: &str = "/v1/blobs/{blob_id}/status"; pub const HEALTH_ENDPOINT: &str = "/v1/health"; pub const SYNC_SHARD_ENDPOINT: &str = "/v1/migrate/sync_shard"; +pub const LOG_DIRECTIVE_ENDPOINT: &str = "/v1/log/directive"; /// Convenience trait to apply bounds on the ServiceState. trait SyncServiceState: ServiceState + Send + Sync + 'static {} @@ -686,3 +687,100 @@ pub async fn sync_shard( ) -> Result> { Ok(Bcs(state.sync_shard(public_key, signed_request).await?).into_response()) } + +#[derive(Debug, Clone, serde::Deserialize, utoipa::IntoParams)] +#[serde(rename_all = "camelCase")] +pub struct LogDirectiveQuery { + /// The log directive to set (e.g. "RUST_LOG=info;walrus=debug") + directive: String, +} + +pub enum LogDirectiveError { + InvalidUrlEncoding(String), + UpdateFailed(String), + Unauthorized(String), +} + +impl std::fmt::Display for LogDirectiveError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidUrlEncoding(e) => write!(f, "Invalid URL encoding: {}", e), + Self::UpdateFailed(e) => write!(f, "Failed to update log directive: {}", e), + Self::Unauthorized(e) => write!(f, "{}", e), + } + } +} + +impl crate::common::api::RestApiError for LogDirectiveError { + fn status_code(&self) -> walrus_sdk::api::errors::StatusCode { + match self { + Self::InvalidUrlEncoding(_) => walrus_sdk::api::errors::StatusCode::InvalidArgument, + Self::UpdateFailed(_) => walrus_sdk::api::errors::StatusCode::Internal, + Self::Unauthorized(_) => walrus_sdk::api::errors::StatusCode::Unauthenticated, + } + } + + fn domain(&self) -> String { + "log_directive".to_string() + } + + fn reason(&self) -> String { + self.to_string() + } + + fn response_descriptions() -> std::collections::HashMap> { + let mut map = std::collections::HashMap::new(); + map.insert( + axum::http::StatusCode::BAD_REQUEST, + vec!["Invalid URL encoding".to_string()], + ); + map.insert( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + vec!["Failed to update log directive".to_string()], + ); + map.insert( + axum::http::StatusCode::UNAUTHORIZED, + vec!["This endpoint can only be accessed from localhost".to_string()], + ); + map + } + + fn add_details(&self, _status: &mut walrus_sdk::api::errors::Status) {} +} + +/// Change the logging verbosity level at runtime. +#[tracing::instrument(skip_all)] +#[utoipa::path( + post, + path = LOG_DIRECTIVE_ENDPOINT, + tag = openapi::GROUP_LOG_DIRECTIVE, + params(LogDirectiveQuery), + responses( + (status = 200, description = "Log directive updated successfully"), + (status = 400, description = "Invalid log directive"), + (status = 500, description = "Failed to update log directive") + ) +)] +pub async fn set_log_directive( + State(state): State>, + ConnectInfo(addr): ConnectInfo, + Query(query): Query, +) -> Result, OrRejection> { + // Check if the request is from the server's bound IP + if !addr.ip().is_loopback() && addr.ip() != state.rest_api_address().ip() { + return Err(OrRejection::Err(LogDirectiveError::Unauthorized( + "This endpoint can only be accessed from localhost or the server's own IP".to_string(), + ))); + } + + let directive = urlencoding::decode(&query.directive) + .map_err(|e| OrRejection::Err(LogDirectiveError::InvalidUrlEncoding(e.to_string())))?; + + tracing::info!("Setting log directive: {}", directive); + state + .update_log_directive(directive.as_ref()) + .map_err(|e| OrRejection::Err(LogDirectiveError::UpdateFailed(e.to_string())))?; + Ok(ApiSuccess::ok( + "Log directive updated successfully".to_string(), + )) +} diff --git a/crates/walrus-service/src/test_utils.rs b/crates/walrus-service/src/test_utils.rs index 0e3f73c72..57b2840cf 100644 --- a/crates/walrus-service/src/test_utils.rs +++ b/crates/walrus-service/src/test_utils.rs @@ -24,6 +24,7 @@ use futures::{future, stream::FuturesUnordered, StreamExt}; use prometheus::Registry; use sui_macros::nondeterministic; use sui_types::base_types::ObjectID; +use telemetry_subscribers::TelemetryConfig; use tempfile::TempDir; #[cfg(msim)] use tokio::sync::RwLock; @@ -902,7 +903,7 @@ impl StorageNodeHandleBuilder { ) .await?; } - + let (_, tracing_handle) = TelemetryConfig::init(TelemetryConfig::default()); let metrics_registry = Registry::default(); let mut builder = StorageNode::builder(); if let Some(num_checkpoints_per_blob) = self.num_checkpoints_per_blob { @@ -915,7 +916,7 @@ impl StorageNodeHandleBuilder { ))) .with_committee_service(committee_service) .with_system_contract_service(contract_service) - .build(&config, metrics_registry.clone()) + .build(&config, metrics_registry.clone(), Arc::new(tracing_handle)) .await?; let node = Arc::new(node);