Skip to content

Commit 1cf0eef

Browse files
prkpndyheemankvMohiiit
authored
chore: refactor/fix logs for orchestrator (#858)
Co-authored-by: Heemank Verma <[email protected]> Co-authored-by: Mohit Dhattarwal <[email protected]>
1 parent 6a1b0df commit 1cf0eef

File tree

32 files changed

+665
-364
lines changed

32 files changed

+665
-364
lines changed

madara/crates/client/gateway/client/src/methods.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ impl GatewayProvider {
6565
.with_block_id(&block_id);
6666

6767
request.send_get::<ProviderBlock>().await
68-
}).await
68+
})
69+
.await
6970
}
7071

7172
pub async fn get_preconfirmed_block(&self, block_number: u64) -> Result<ProviderBlockPreConfirmed, SequencerError> {
@@ -76,7 +77,8 @@ impl GatewayProvider {
7677
.with_block_id(&BlockId::Number(block_number));
7778

7879
request.send_get::<ProviderBlockPreConfirmed>().await
79-
}).await
80+
})
81+
.await
8082
}
8183

8284
pub async fn get_header(&self, block_id: BlockId) -> Result<ProviderBlockHeader, SequencerError> {
@@ -88,7 +90,8 @@ impl GatewayProvider {
8890
.add_param("headerOnly", "true");
8991

9092
request.send_get::<ProviderBlockHeader>().await
91-
}).await
93+
})
94+
.await
9295
}
9396

9497
pub async fn get_state_update(&self, block_id: BlockId) -> Result<ProviderStateUpdate, SequencerError> {
@@ -99,7 +102,8 @@ impl GatewayProvider {
99102
.with_block_id(&block_id);
100103

101104
request.send_get::<ProviderStateUpdate>().await
102-
}).await
105+
})
106+
.await
103107
}
104108

105109
pub async fn get_block_bouncer_weights(&self, block_number: u64) -> Result<BouncerWeights, SequencerError> {
@@ -123,7 +127,8 @@ impl GatewayProvider {
123127
.add_param(Cow::from("includeBlock"), "true");
124128

125129
request.send_get::<ProviderStateUpdateWithBlock>().await
126-
}).await
130+
})
131+
.await
127132
}
128133

129134
pub async fn get_signature(&self, block_id: BlockId) -> Result<ProviderBlockSignature, SequencerError> {
@@ -138,7 +143,8 @@ impl GatewayProvider {
138143
.with_block_id(&block_id);
139144

140145
request.send_get::<ProviderBlockSignature>().await
141-
}).await
146+
})
147+
.await
142148
}
143149

144150
pub async fn get_class_by_hash(
@@ -165,7 +171,8 @@ impl GatewayProvider {
165171
let err = serde::de::Error::custom("Unknown contract type".to_string());
166172
Err(SequencerError::DeserializeBody { serde_error: err })
167173
}
168-
}).await
174+
})
175+
.await
169176
}
170177

171178
async fn add_transaction<T>(&self, transaction: UserTransaction) -> Result<T, SequencerError>
@@ -421,6 +428,7 @@ mod tests {
421428

422429
#[rstest]
423430
#[tokio::test]
431+
#[ignore = "Madara is incompatible with 0.14.1 feeder gateway changes, specifically migrated_compiled_classes."]
424432
async fn get_preconfirmed_block(client_testnet_fixture: GatewayProvider) {
425433
let latest_block_number =
426434
client_testnet_fixture.get_header(BlockId::Tag(BlockTag::Latest)).await.unwrap().block_number;

orchestrator/crates/settlement-clients/ethereum/src/lib.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use lazy_static::lazy_static;
4343
use mockall::automock;
4444
use reqwest::Client;
4545
use tokio::time::sleep;
46-
use tracing::{info, warn};
46+
use tracing::{error, info, warn};
4747

4848
// For more details on state update, refer to the core contract logic
4949
// https://github.com/starkware-libs/cairo-lang/blob/master/src/starkware/starknet/solidity/Output.sol
@@ -263,11 +263,13 @@ impl SettlementClient for EthereumSettlementClient {
263263
state_diff: Vec<Vec<u8>>,
264264
_nonce: u64,
265265
) -> Result<String> {
266-
tracing::info!(
266+
// TODO(prakhar,20/11/2025): Update the logs to add custom formatter - https://github.com/madara-alliance/madara/blob/d2a1e8050a3d01ccf398f57616cbc4fb6386aaa6/madara/crates/client/analytics/src/formatter.rs#L288
267+
info!(
267268
log_type = "starting",
268269
category = "update_state",
269-
function_type = "blobs",
270-
"Updating state with blobs."
270+
state_diff_len = %state_diff.len(),
271+
program_output_len = %program_output.len(),
272+
"Updating state with blob"
271273
);
272274

273275
let mut mul_factor = GAS_PRICE_MULTIPLIER_START;
@@ -279,8 +281,15 @@ impl SettlementClient for EthereumSettlementClient {
279281
Result::Ok(pending_transaction) => pending_transaction,
280282
Err(e) => match e {
281283
SendTransactionError::ReplacementTransactionUnderpriced(e) => {
282-
warn!("Failed to send the state update transaction: {:?} with {:?} multiplication factor, trying again...", e, mul_factor);
283-
mul_factor = self.get_next_mul_factor(mul_factor)?;
284+
let next_mul_factor = self.get_next_mul_factor(mul_factor)?;
285+
warn!(
286+
current_multiplier = %mul_factor,
287+
next_multiplier = %next_mul_factor,
288+
max_multiplier = %self.max_gas_price_mul_factor,
289+
error = ?e,
290+
"Transaction rejected due to low gas price, retrying with higher multiplier"
291+
);
292+
mul_factor = next_mul_factor;
284293
continue;
285294
}
286295
SendTransactionError::Other(_) => {
@@ -289,24 +298,29 @@ impl SettlementClient for EthereumSettlementClient {
289298
},
290299
};
291300

292-
tracing::info!(
301+
info!(
293302
log_type = "completed",
294303
category = "update_state",
295304
function_type = "blobs",
296-
"State updated with blobs."
305+
tx_hash = %pending_transaction.tx_hash(),
306+
"State update transaction submitted to Ethereum with blobs"
297307
);
298308

299-
tracing::warn!("⏳ Waiting for txn finality...");
300-
301309
// Waiting for transaction finality
302310
let res = self.wait_for_tx_finality(&pending_transaction.tx_hash().to_string()).await?;
303311

304312
match res {
305313
Some(_) => {
306-
tracing::info!("✅ Txn hash : {:?} finalized", pending_transaction.tx_hash().to_string());
314+
info!(
315+
tx_hash = %pending_transaction.tx_hash(),
316+
"Transaction finalized successfully"
317+
);
307318
}
308319
None => {
309-
tracing::error!("❌ Txn hash: {:?} not finalised", pending_transaction.tx_hash().to_string());
320+
error!(
321+
tx_hash = %pending_transaction.tx_hash(),
322+
"Transaction not finalized"
323+
);
310324
}
311325
}
312326
return Ok(pending_transaction.tx_hash().to_string());

orchestrator/crates/settlement-clients/starknet/src/lib.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use starknet::providers::jsonrpc::HttpTransport;
2424
use starknet::providers::{JsonRpcClient, Provider};
2525
use starknet::signers::{LocalWallet, SigningKey};
2626
use tokio::time::{sleep, Duration};
27+
use tracing::info;
2728

2829
use crate::conversion::{slice_slice_u8_to_vec_field, u64_from_felt};
2930
use crate::utils::LocalWalletSignerMiddleware;
@@ -122,11 +123,12 @@ impl SettlementClient for StarknetSettlementClient {
122123
_onchain_data_hash: [u8; 32],
123124
_onchain_data_size: [u8; 32],
124125
) -> Result<String> {
125-
tracing::info!(
126+
info!(
126127
log_type = "starting",
127128
category = "update_state",
128-
function_type = "calldata",
129-
"Updating state with calldata."
129+
snos_output_len = %snos_output.len(),
130+
program_output_len = %program_output.len(),
131+
"Updating state with calldata"
130132
);
131133
let snos_output = slice_slice_u8_to_vec_field(snos_output.as_slice());
132134
let layout_bridge_output = slice_slice_u8_to_vec_field(program_output.as_slice());
@@ -136,13 +138,16 @@ impl SettlementClient for StarknetSettlementClient {
136138
.update_state(snos_output, layout_bridge_output)
137139
.await
138140
.map_err(|e| eyre!("Failed to update state with calldata: {:?}", e))?;
139-
tracing::info!(
141+
142+
let tx_hash = invoke_result.transaction_hash.to_hex_string();
143+
info!(
140144
log_type = "completed",
141145
category = "update_state",
142146
function_type = "calldata",
143-
"State updated with calldata."
147+
tx_hash = %tx_hash,
148+
"State update transaction submitted to Starknet with calldata"
144149
);
145-
Ok(invoke_result.transaction_hash.to_hex_string())
150+
Ok(tx_hash)
146151
}
147152

148153
/// Should verify the inclusion of a tx in the settlement layer

orchestrator/src/core/client/alert/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,14 @@ pub trait AlertClient: Send + Sync {
1717
///
1818
/// # Returns
1919
async fn send_message(&self, message_body: String) -> Result<(), AlertError>;
20+
21+
/// Perform a health check on the alert service
22+
///
23+
/// This method verifies that the alert service (e.g., AWS SNS) is accessible
24+
/// and the necessary permissions are in place.
25+
///
26+
/// # Returns
27+
/// * `Ok(())` - If the alert service is healthy and accessible
28+
/// * `Err(AlertError)` - If the health check fails
29+
async fn health_check(&self) -> Result<(), AlertError>;
2030
}

orchestrator/src/core/client/alert/sns.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,19 @@ impl AlertClient for SNS {
147147
self.client().publish().topic_arn(self.get_topic_arn().await?).message(message_body).send().await?;
148148
Ok(())
149149
}
150+
151+
async fn health_check(&self) -> Result<(), AlertError> {
152+
// Verify SNS accessibility by getting topic attributes
153+
// This checks both connectivity and permissions
154+
let topic_arn = self.get_topic_arn().await?;
155+
156+
self.client()
157+
.get_topic_attributes()
158+
.topic_arn(topic_arn)
159+
.send()
160+
.await
161+
.map_err(|e| AlertError::TopicARNInvalid(format!("SNS health check failed: {}", e)))?;
162+
163+
Ok(())
164+
}
150165
}

orchestrator/src/core/client/database/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,4 +388,19 @@ pub trait DatabaseClient: Send + Sync {
388388
&self,
389389
aggregator_index: u64,
390390
) -> Result<Vec<SnosBatch>, DatabaseError>;
391+
392+
// ================================================================================
393+
// Health Check Methods
394+
// ================================================================================
395+
396+
/// Perform a health check on the database connection
397+
///
398+
/// This method verifies that the database is accessible and operational.
399+
/// It should perform a lightweight operation that validates connectivity
400+
/// and basic functionality without impacting performance.
401+
///
402+
/// # Returns
403+
/// * `Ok(())` - If the database is healthy and accessible
404+
/// * `Err(DatabaseError)` - If the health check fails
405+
async fn health_check(&self) -> Result<(), DatabaseError>;
391406
}

orchestrator/src/core/client/database/mongodb.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,6 @@ impl DatabaseClient for MongoDbClient {
10241024
Ok(batches)
10251025
}
10261026

1027-
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
10281027
async fn get_snos_batches_without_jobs(
10291028
&self,
10301029
snos_batch_status: SnosBatchStatus,
@@ -1368,6 +1367,19 @@ impl DatabaseClient for MongoDbClient {
13681367
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
13691368
Ok(updated_batches)
13701369
}
1370+
1371+
async fn health_check(&self) -> Result<(), DatabaseError> {
1372+
let start = Instant::now();
1373+
1374+
// Perform a simple ping operation to verify connectivity
1375+
// This is a lightweight operation that checks if the database is accessible
1376+
self.database.run_command(doc! { "ping": 1 }, None).await?;
1377+
1378+
let attributes = [KeyValue::new("db_operation_name", "health_check")];
1379+
let duration = start.elapsed();
1380+
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
1381+
Ok(())
1382+
}
13711383
}
13721384

13731385
// Generic utility function to convert Vec<T> to Option<T>

orchestrator/src/core/client/queue/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,14 @@ pub trait QueueClient: Send + Sync {
1616
async fn get_consumer(&self, queue: QueueType) -> Result<SqsConsumer, QueueError>;
1717
async fn send_message(&self, queue: QueueType, payload: String, delay: Option<Duration>) -> Result<(), QueueError>;
1818
async fn consume_message_from_queue(&self, queue: QueueType) -> Result<Delivery, QueueError>;
19+
20+
/// Perform a health check on the queue service
21+
///
22+
/// This method verifies that the queue service (e.g., AWS SQS) is accessible
23+
/// and the necessary permissions are in place.
24+
///
25+
/// # Returns
26+
/// * `Ok(())` - If the queue service is healthy and accessible
27+
/// * `Err(QueueError)` - If the health check fails
28+
async fn health_check(&self) -> Result<(), QueueError>;
1929
}

orchestrator/src/core/client/queue/sqs.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ impl QueueClient for SQS {
311311
.queue_url(&queue_url)
312312
.max_number_of_messages(1)
313313
.message_attribute_names("All")
314-
.visibility_timeout(30)
315314
.send()
316315
.await?;
317316

@@ -435,4 +434,15 @@ impl QueueClient for SQS {
435434

436435
Ok(delivery)
437436
}
437+
438+
async fn health_check(&self) -> Result<(), QueueError> {
439+
// Verify SQS accessibility by getting queue attributes
440+
// This checks both connectivity and permissions
441+
let queue_name = self.get_queue_name(&QueueType::WorkerTrigger)?;
442+
let queue_url = self.inner.get_queue_url_from_client(queue_name.as_str()).await?;
443+
444+
self.inner.client().get_queue_attributes().queue_url(&queue_url).send().await?;
445+
446+
Ok(())
447+
}
438448
}

orchestrator/src/core/client/storage/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,14 @@ pub trait StorageClient: Send + Sync {
1919

2020
/// List files in a directory
2121
async fn list_files_in_dir(&self, dir_path: &str) -> Result<Vec<String>, StorageError>;
22+
23+
/// Perform a health check on the storage service
24+
///
25+
/// This method verifies that the storage service (e.g., AWS S3) is accessible
26+
/// and the necessary permissions are in place.
27+
///
28+
/// # Returns
29+
/// * `Ok(())` - If the storage service is healthy and accessible
30+
/// * `Err(StorageError)` - If the health check fails
31+
async fn health_check(&self) -> Result<(), StorageError>;
2232
}

0 commit comments

Comments
 (0)