Skip to content

feat: Allow updating logging verbosity dynamically at runtime #1829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ tracing-opentelemetry = { version = "=0.28.0", default-features = false }
tracing-subscriber = { version = "0.3.19", default-features = false, features = ["fmt"] }
typed-store = { git = "https://github.com/MystenLabs/sui", tag = "testnet-v1.45.2" }
url = "2.5.4"
urlencoding = "2.1.3"
utoipa = { version = "5" }
utoipa-redoc = { version = "6.0", features = ["axum"] }
uuid = { version = "1.16.0", features = ["fast-rng", "macro-diagnostics", "v7"] }
Expand Down
1 change: 1 addition & 0 deletions crates/walrus-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
typed-store = { workspace = true, optional = true }
urlencoding.workspace = true
utoipa = { workspace = true, features = ["axum_extras", "macros", "yaml"] }
utoipa-redoc.workspace = true
uuid.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion crates/walrus-service/bin/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,11 @@ impl StorageNodeRuntime {
StorageNode::builder()
.with_system_event_manager(event_manager)
.with_config_loader(config_loader)
.build(node_config, metrics_runtime.registry.clone()),
.build(
node_config,
metrics_runtime.registry.clone(),
metrics_runtime.tracing_handle.clone(),
),
)?,
);

Expand Down
22 changes: 22 additions & 0 deletions crates/walrus-service/src/common/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use prometheus::{
IntGaugeVec,
Opts,
};
use telemetry_subscribers::TracingHandle;
use tokio::time::Instant;
use tower_http::trace::{MakeSpan, OnResponse};
use tracing::{field, Span};
Expand Down Expand Up @@ -565,3 +566,24 @@ impl From<CurrentEpochStateMetric> for Box<dyn Collector> {
Box::new(value.0)
}
}

pub struct WalrusTracingHandle(pub Arc<TracingHandle>);

impl std::ops::Deref for WalrusTracingHandle {
type Target = Arc<TracingHandle>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<WalrusTracingHandle> for Arc<TracingHandle> {
fn from(value: WalrusTracingHandle) -> Self {
value.0
}
}

impl std::fmt::Debug for WalrusTracingHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WalrusTracingHandle")
}
}
5 changes: 3 additions & 2 deletions crates/walrus-service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ pub struct MetricsAndLoggingRuntime {
/// The Prometheus registry.
pub registry: Registry,
_telemetry_guards: TelemetryGuards,
_tracing_handle: TracingHandle,
/// The tracing handle.
pub tracing_handle: Arc<TracingHandle>,
/// The runtime for metrics and logging.
// INV: Runtime must be dropped last.
pub runtime: Option<Runtime>,
Expand Down Expand Up @@ -291,7 +292,7 @@ impl MetricsAndLoggingRuntime {
runtime,
registry: walrus_registry,
_telemetry_guards: telemetry_guards,
_tracing_handle: tracing_handle,
tracing_handle: Arc::new(tracing_handle),
})
}
}
Expand Down
80 changes: 69 additions & 11 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use std::{
future::Future,
net::SocketAddr,
num::{NonZero, NonZeroU16},
pin::Pin,
str::FromStr,
Expand Down Expand Up @@ -45,6 +46,7 @@ use sui_macros::fail_point_if;
use sui_macros::{fail_point_arg, fail_point_async};
use sui_types::{base_types::ObjectID, event::EventID};
use system_events::{CompletableHandle, EventHandle};
use telemetry_subscribers::TracingHandle;
use thread_pool::ThreadPoolBuilder;
use tokio::{select, sync::watch, time::Instant};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -155,6 +157,7 @@ use crate::{
common::{
active_committees::ActiveCommittees,
config::SuiConfig,
telemetry::WalrusTracingHandle,
utils::should_reposition_cursor,
},
utils::ShardDiffCalculator,
Expand Down Expand Up @@ -276,6 +279,9 @@ pub trait ServiceState {
/// Returns the node health information of this ServiceState.
fn health_info(&self, detailed: bool) -> ServiceHealthInfo;

/// Returns the server's bound address.
fn rest_api_address(&self) -> SocketAddr;

/// Returns whether the sliver is stored in the shard.
fn sliver_status<A: EncodingAxis>(
&self,
Expand All @@ -289,6 +295,9 @@ pub trait ServiceState {
public_key: PublicKey,
signed_request: SignedSyncShardRequest,
) -> impl Future<Output = Result<SyncShardResponse, SyncShardServiceError>> + Send;

/// Updates the log directive for the node.
fn update_log_directive<S: AsRef<str>>(&self, directive: S) -> Result<(), anyhow::Error>;
}

/// Builder to construct a [`StorageNode`].
Expand Down Expand Up @@ -367,6 +376,7 @@ impl StorageNodeBuilder {
self,
config: &StorageNodeConfig,
metrics_registry: Registry,
tracing_handle: Arc<TracingHandle>,
) -> Result<StorageNode, anyhow::Error> {
let protocol_key_pair = config
.protocol_key_pair
Expand Down Expand Up @@ -459,15 +469,16 @@ impl StorageNodeBuilder {
num_checkpoints_per_blob: self.num_checkpoints_per_blob,
};

StorageNode::new(
StorageNode::new(StorageNodeArgs {
config,
event_manager,
committee_service,
contract_service,
&metrics_registry,
self.config_loader,
registry: &metrics_registry,
tracing_handle: WalrusTracingHandle(tracing_handle),
config_loader: self.config_loader,
node_params,
)
})
.await
}
}
Expand Down Expand Up @@ -508,6 +519,8 @@ pub struct StorageNodeInner {
node_capability: ObjectID,
blob_retirement_notifier: Arc<BlobRetirementNotifier>,
symbol_service: RecoverySymbolService,
tracing_handle: WalrusTracingHandle,
rest_api_address: SocketAddr,
}

/// Parameters for configuring and initializing a node.
Expand All @@ -523,15 +536,39 @@ pub struct NodeParameters {
num_checkpoints_per_blob: Option<u32>,
}

/// Arguments for creating a new storage node.
#[derive(Debug)]
pub struct StorageNodeArgs<'a> {
/// Configuration for the storage node.
pub config: &'a StorageNodeConfig,
/// Event manager for handling system events.
pub event_manager: Box<dyn EventManager>,
/// Service for managing committee-related operations.
pub committee_service: Arc<dyn CommitteeService>,
/// Service for managing system contracts.
pub contract_service: Arc<dyn SystemContractService>,
/// Prometheus registry for metrics.
pub registry: &'a Registry,
/// Handle for tracing operations.
pub tracing_handle: WalrusTracingHandle,
/// Optional configuration loader.
pub config_loader: Option<Arc<dyn ConfigLoader>>,
/// Additional node parameters.
pub node_params: NodeParameters,
}

impl StorageNode {
async fn new(
config: &StorageNodeConfig,
event_manager: Box<dyn EventManager>,
committee_service: Arc<dyn CommitteeService>,
contract_service: Arc<dyn SystemContractService>,
registry: &Registry,
config_loader: Option<Arc<dyn ConfigLoader>>,
node_params: NodeParameters,
StorageNodeArgs {
config,
event_manager,
committee_service,
contract_service,
registry,
tracing_handle,
config_loader,
node_params,
}: StorageNodeArgs<'_>,
) -> Result<Self, anyhow::Error> {
let start_time = Instant::now();
let node_capability = contract_service
Expand Down Expand Up @@ -600,6 +637,8 @@ impl StorageNode {
.build_bounded(),
),
encoding_config,
tracing_handle,
rest_api_address: config.rest_api_address,
});

blocklist.start_refresh_task();
Expand Down Expand Up @@ -2111,6 +2150,10 @@ impl ServiceState for StorageNode {
self.inner.health_info(detailed)
}

fn rest_api_address(&self) -> SocketAddr {
self.inner.rest_api_address()
}

fn sliver_status<A: EncodingAxis>(
&self,
blob_id: &BlobId,
Expand All @@ -2126,6 +2169,10 @@ impl ServiceState for StorageNode {
) -> impl Future<Output = Result<SyncShardResponse, SyncShardServiceError>> + Send {
self.inner.sync_shard(public_key, signed_request)
}

fn update_log_directive<S: AsRef<str>>(&self, directive: S) -> Result<(), anyhow::Error> {
self.inner.update_log_directive(directive)
}
}

impl ServiceState for StorageNodeInner {
Expand Down Expand Up @@ -2526,6 +2573,17 @@ impl ServiceState for StorageNodeInner {
.handle_sync_shard_request(request, self.current_epoch())
.await
}

fn update_log_directive<S: AsRef<str>>(&self, directive: S) -> Result<(), anyhow::Error> {
self.tracing_handle
.update_log(directive)
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(())
}

fn rest_api_address(&self) -> SocketAddr {
self.rest_api_address
}
}

#[tracing::instrument(skip_all, err)]
Expand Down
14 changes: 14 additions & 0 deletions crates/walrus-service/src/node/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ where
.route(routes::BLOB_STATUS_ENDPOINT, get(routes::get_blob_status))
.route(routes::HEALTH_ENDPOINT, get(routes::health_info))
.route(routes::SYNC_SHARD_ENDPOINT, post(routes::sync_shard))
.route(
routes::LOG_DIRECTIVE_ENDPOINT,
post(routes::set_log_directive),
)
}
}

Expand Down Expand Up @@ -424,6 +428,8 @@ fn to_pkcs8_key_pair(keypair: &NetworkKeyPair) -> RcGenKeyPair {

#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};

use anyhow::anyhow;
use axum::http::StatusCode;
use fastcrypto::traits::KeyPair;
Expand Down Expand Up @@ -689,6 +695,14 @@ mod tests {
) -> Result<SyncShardResponse, SyncShardServiceError> {
Ok(SyncShardResponse::V1(vec![]))
}

fn rest_api_address(&self) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080)
}

fn update_log_directive<S: AsRef<str>>(&self, _directive: S) -> Result<(), anyhow::Error> {
Ok(())
}
}

async fn start_rest_api_with_config(
Expand Down
3 changes: 2 additions & 1 deletion crates/walrus-service/src/node/server/openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(super) const GROUP_READING_BLOBS: &str = "Reading Blobs";
pub(super) const GROUP_RECOVERY: &str = "Recovery";
pub(super) const GROUP_STATUS: &str = "Status";
pub(super) const GROUP_SYNC_SHARD: &str = "Sync Shard";

pub(super) const GROUP_LOG_DIRECTIVE: &str = "Log Directive";
#[derive(utoipa::OpenApi)]
#[openapi(
paths(
Expand All @@ -34,6 +34,7 @@ pub(super) const GROUP_SYNC_SHARD: &str = "Sync Shard";
routes::list_recovery_symbols,
routes::put_metadata,
routes::put_sliver,
routes::set_log_directive,
),
components(schemas(
EpochSchema,
Expand Down
Loading
Loading