Skip to content

Commit 4f480c6

Browse files
feat: Implement client pooling for Webhook-based notifiers (#281)
* feat: Move generic client storage to a separate module * feat: Add NotificationClientPool for managing HTTP clients in notifications * test: Add unit tests for NotificationClientPool functionality * feat: Extend NotificationClientPool to manage SMTP clients and improve error handling * feat: make HTTP retry configuration serializable * feat: add retry policy to TriggerTypeConfig and update related tests * feat: add retry policy to TriggerTypeConfig and update related notification implementations * feat: add client pool to NotificationService * chore: formatting * feat: add default serde attribute to retry_policy in TriggerTypeConfig variants * fix: minor * feat: refactor notification services to use a shared HTTP client * feat: update WebhookNotifier to use a retryable HTTP client with TransientErrorRetryStrategy * test: update integration tests to include retryable and non-retryable error scenarios * feat: update TelegramNotifier to use POST method and JSON payload for webhook notifications * minor * feat: refactor EmailNotifier to accept an SMTP client and update notification service to use client pool * feat: update NotificationClientPool to use ClientWithMiddleware * test: refactor test to use test http client helpers * feat: enhance SmtpConfig with additional traits and implement SMTP client creation in NotificationClientPool * test: adjust error context assertions and add additional test * test: add missing tests * test: update retry expectations in notification tests to use dynamic retry count * refactor: improve documentation for HTTP and SMTP client methods in NotificationClientPool * minor * refactor: remove unnecessary WebhookPayload struct and simplify payload handling * test: remove redundant test * Update src/services/notification/mod.rs Co-authored-by: Nicolas Molina <[email protected]> * refactor: improve error message for missing retry policy in trigger configuration * fix: pass source error to trigger execution error * docs: update notification protocols section * docs: clarify base_for_backoff parameter description in retry policy --------- Co-authored-by: Nicolas Molina <[email protected]>
1 parent 7f8c885 commit 4f480c6

File tree

32 files changed

+1332
-405
lines changed

32 files changed

+1332
-405
lines changed

docs/modules/ROOT/pages/index.adoc

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,20 @@ The monitor implements protocol security validations across different components
202202

203203
===== Notification Protocols
204204

205+
====== Webhook Notifications
206+
* *HTTPS Recommended*: URLs should use HTTPS protocol
207+
* *Authentication Recommended*: Including either:
208+
** `X-API-Key` header
209+
** `Authorization` header
210+
* *Optional Secret*: Can include a secret for HMAC authentication
211+
** When a secret is provided, the monitor will:
212+
*** Generate a timestamp in milliseconds
213+
*** Create an HMAC-SHA256 signature of the payload and timestamp
214+
*** Add the signature in the `X-Signature` header
215+
*** Add the timestamp in the `X-Timestamp` header
216+
** The signature is computed as: `HMAC-SHA256(secret, payload + timestamp)`
217+
* *Warning*: Non-HTTPS URLs or missing authentication headers will trigger security warnings
218+
205219
====== Slack Notifications
206220
* *HTTPS Recommended*: Webhook URLs should start with `https://hooks.slack.com/`
207221
* *Warning*: Non-HTTPS URLs will trigger security warnings
@@ -210,6 +224,14 @@ The monitor implements protocol security validations across different components
210224
* *HTTPS Recommended*: Webhook URLs should start with `https://discord.com/api/webhooks/`
211225
* *Warning*: Non-HTTPS URLs will trigger security warnings
212226

227+
====== Telegram Notifications
228+
* **Protocol:** `POST` request with a `application/json` payload to the `sendMessage` method.
229+
* **Endpoint:** `https://api.telegram.org/bot<token>/sendMessage`
230+
* **Security:**
231+
** **HTTPS Required:** The API endpoint uses HTTPS.
232+
** Authentication is handled via the **Bot Token** in the URL. Keep this token secure.
233+
* **Formatting:** Messages are sent with `parse_mode` set to `MarkdownV2`. Special characters in the message title and body are automatically escaped to prevent formatting errors.
234+
213235
====== Email Notifications
214236
* *Secure Ports Recommended*: The following ports are considered secure:
215237
** 465: SMTPS (SMTP over SSL)
@@ -218,19 +240,27 @@ The monitor implements protocol security validations across different components
218240
* *Warning*: Using other ports will trigger security warnings
219241
* *Valid Format*: Email addresses must follow RFC 5322 format
220242

221-
====== Webhook Notifications
222-
* *HTTPS Recommended*: URLs should use HTTPS protocol
223-
* *Authentication Recommended*: Including either:
224-
** `X-API-Key` header
225-
** `Authorization` header
226-
* *Optional Secret*: Can include a secret for HMAC authentication
227-
** When a secret is provided, the monitor will:
228-
*** Generate a timestamp in milliseconds
229-
*** Create an HMAC-SHA256 signature of the payload and timestamp
230-
*** Add the signature in the `X-Signature` header
231-
*** Add the timestamp in the `X-Timestamp` header
232-
** The signature is computed as: `HMAC-SHA256(secret, payload + timestamp)`
233-
* *Warning*: Non-HTTPS URLs or missing authentication headers will trigger security warnings
243+
====== Notifcations Retry Policy
244+
245+
Following notification protocols support retry policies:
246+
247+
* Slack
248+
* Discord
249+
* Telegram
250+
* Webhook
251+
252+
Default retry policy is using exponential backoff with the following parameters:
253+
[cols="1,1,1"]
254+
|===
255+
| Parameter | Default Value | Description
256+
| `max_retries` | `3` | Maximum number of retries before giving up
257+
| `base_for_backoff` | `2` | Base duration for exponential backoff calculations in seconds
258+
| `initial_backoff` | `250` | Initial backoff duration in milliseconds
259+
| `max_backoff` | `10` | Maximum backoff duration in seconds
260+
| `jitter` | `Full` | Jitter strategy to apply to the backoff duration, currently supports `Full` and `None`
261+
|===
262+
263+
These parameters can be overridden by providing custom `HttpRetryConfig` struct in `retry_policy` field in trigger configuration.
234264

235265
===== Script Security
236266

docs/modules/ROOT/pages/rpc.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub struct HttpRetryConfig {
147147
/// Maximum backoff duration for retries.
148148
pub max_backoff: Duration,
149149
/// Base for the exponential backoff calculation (e.g., 2).
150-
pub base_for_backoff: u64, // Added field
150+
pub base_for_backoff: u64,
151151
/// Jitter to apply to the backoff duration.
152152
pub jitter: Jitter,
153153
}

src/bootstrap/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ pub fn create_trigger_handler<S: TriggerExecutionServiceTrait + Send + Sync + 's
391391
let filtered_matches = run_trigger_filters(&block.processing_results, &block.network_slug, &trigger_scripts).await;
392392
for monitor_match in &filtered_matches {
393393
if let Err(e) = handle_match(monitor_match.clone(), &*trigger_service, &trigger_scripts).await {
394-
TriggerError::execution_error(e.to_string(), None, None);
394+
TriggerError::execution_error(e.to_string(), Some(e.into()), None);
395395
}
396396
}
397397
} => {}

src/models/config/trigger_config.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,12 @@ impl ConfigLoader for Trigger {
254254

255255
match &self.trigger_type {
256256
TriggerType::Slack => {
257-
if let TriggerTypeConfig::Slack { slack_url, message } = &self.config {
257+
if let TriggerTypeConfig::Slack {
258+
slack_url,
259+
message,
260+
retry_policy: _,
261+
} = &self.config
262+
{
258263
// Validate webhook URL
259264
if !slack_url.starts_with("https://hooks.slack.com/") {
260265
return Err(ConfigError::validation_error(
@@ -698,6 +703,7 @@ mod tests {
698703
use crate::models::NotificationMessage;
699704
use crate::models::{core::Trigger, ScriptLanguage, SecretString};
700705
use crate::utils::tests::builders::trigger::TriggerBuilder;
706+
use crate::utils::HttpRetryConfig;
701707
use std::{fs::File, io::Write, os::unix::fs::PermissionsExt};
702708
use tempfile::TempDir;
703709
use tracing_test::traced_test;
@@ -1440,6 +1446,7 @@ mod tests {
14401446
title: "Test".to_string(),
14411447
body: "x".repeat(TELEGRAM_MAX_BODY_LENGTH + 1), // Exceeds max length
14421448
},
1449+
retry_policy: HttpRetryConfig::default(),
14431450
},
14441451
};
14451452
assert!(max_body_length.validate().is_err());
@@ -1458,6 +1465,7 @@ mod tests {
14581465
title: "Test".to_string(),
14591466
body: "z".repeat(DISCORD_MAX_BODY_LENGTH + 1), // Exceeds max length
14601467
},
1468+
retry_policy: HttpRetryConfig::default(),
14611469
},
14621470
};
14631471
assert!(max_body_length.validate().is_err());

src/models/core/trigger.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::models::{core::ScriptLanguage, SecretValue};
1+
use crate::{
2+
models::{core::ScriptLanguage, SecretValue},
3+
utils::HttpRetryConfig,
4+
};
25
use email_address::EmailAddress;
36
use serde::{Deserialize, Serialize};
47

@@ -56,6 +59,9 @@ pub enum TriggerTypeConfig {
5659
slack_url: SecretValue,
5760
/// Notification message
5861
message: NotificationMessage,
62+
/// Retry policy for HTTP requests
63+
#[serde(default)]
64+
retry_policy: HttpRetryConfig,
5965
},
6066
/// Email notification configuration
6167
Email {
@@ -86,6 +92,9 @@ pub enum TriggerTypeConfig {
8692
headers: Option<std::collections::HashMap<String, String>>,
8793
/// Notification message
8894
message: NotificationMessage,
95+
/// Retry policy for HTTP requests
96+
#[serde(default)]
97+
retry_policy: HttpRetryConfig,
8998
},
9099
/// Telegram notification configuration
91100
Telegram {
@@ -97,13 +106,19 @@ pub enum TriggerTypeConfig {
97106
disable_web_preview: Option<bool>,
98107
/// Notification message
99108
message: NotificationMessage,
109+
/// Retry policy for HTTP requests
110+
#[serde(default)]
111+
retry_policy: HttpRetryConfig,
100112
},
101113
/// Discord notification configuration
102114
Discord {
103115
/// Discord webhook URL
104116
discord_url: SecretValue,
105117
/// Notification message
106118
message: NotificationMessage,
119+
/// Retry policy for HTTP requests
120+
#[serde(default)]
121+
retry_policy: HttpRetryConfig,
107122
},
108123
/// Script execution configuration
109124
Script {
@@ -118,3 +133,16 @@ pub enum TriggerTypeConfig {
118133
timeout_ms: u32,
119134
},
120135
}
136+
137+
impl TriggerTypeConfig {
138+
/// Get the retry policy for the trigger type, if applicable.
139+
pub fn get_retry_policy(&self) -> Option<HttpRetryConfig> {
140+
match self {
141+
Self::Slack { retry_policy, .. } => Some(retry_policy.clone()),
142+
Self::Discord { retry_policy, .. } => Some(retry_policy.clone()),
143+
Self::Webhook { retry_policy, .. } => Some(retry_policy.clone()),
144+
Self::Telegram { retry_policy, .. } => Some(retry_policy.clone()),
145+
_ => None,
146+
}
147+
}
148+
}

src/services/blockchain/pool.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//! The pool uses a fast path for existing clients and a slow path for
1111
//! creating new ones, optimizing performance while maintaining safety.
1212
13+
use crate::utils::client_storage::ClientStorage;
1314
use crate::{
1415
models::{BlockChainType, Network},
1516
services::blockchain::{
@@ -21,7 +22,6 @@ use anyhow::Context;
2122
use async_trait::async_trait;
2223
use futures::future::BoxFuture;
2324
use std::{any::Any, collections::HashMap, sync::Arc};
24-
use tokio::sync::RwLock;
2525

2626
/// Trait for the client pool.
2727
#[async_trait]
@@ -41,22 +41,6 @@ pub trait ClientPoolTrait: Send + Sync {
4141
) -> Result<Arc<Self::StellarClient>, anyhow::Error>;
4242
}
4343

44-
/// Generic client storage that can hold any type of blockchain client
45-
///
46-
/// Clients are stored in a thread-safe way using a HashMap and an RwLock.
47-
/// The HashMap is indexed by the network slug and the value is an Arc of the client.
48-
pub struct ClientStorage<T> {
49-
clients: Arc<RwLock<HashMap<String, Arc<T>>>>,
50-
}
51-
52-
impl<T> ClientStorage<T> {
53-
pub fn new() -> Self {
54-
Self {
55-
clients: Arc::new(RwLock::new(HashMap::new())),
56-
}
57-
}
58-
}
59-
6044
/// Main client pool manager that handles multiple blockchain types.
6145
///
6246
/// Provides type-safe access to cached blockchain clients. Clients are created

src/services/notification/discord.rs

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
//! via incoming webhooks, supporting message templates with variable substitution.
55
66
use async_trait::async_trait;
7+
use reqwest_middleware::ClientWithMiddleware;
78
use serde::Serialize;
89
use serde_json;
9-
use std::collections::HashMap;
10+
use std::{collections::HashMap, sync::Arc};
1011

1112
use crate::{
1213
models::TriggerTypeConfig,
@@ -77,22 +78,26 @@ impl DiscordNotifier {
7778
/// * `url` - Discord webhook URL
7879
/// * `title` - Message title
7980
/// * `body_template` - Message template with variables
81+
/// * `http_client` - HTTP client with middleware for retries
8082
pub fn new(
8183
url: String,
8284
title: String,
8385
body_template: String,
86+
http_client: Arc<ClientWithMiddleware>,
8487
) -> Result<Self, NotificationError> {
88+
let config = WebhookConfig {
89+
url,
90+
url_params: None,
91+
title,
92+
body_template,
93+
method: Some("POST".to_string()),
94+
secret: None,
95+
headers: None,
96+
payload_fields: None,
97+
};
98+
8599
Ok(Self {
86-
inner: WebhookNotifier::new(WebhookConfig {
87-
url,
88-
url_params: None,
89-
title,
90-
body_template,
91-
method: Some("POST".to_string()),
92-
secret: None,
93-
headers: None,
94-
payload_fields: None,
95-
})?,
100+
inner: WebhookNotifier::new(config, http_client)?,
96101
})
97102
}
98103

@@ -112,13 +117,18 @@ impl DiscordNotifier {
112117
///
113118
/// # Arguments
114119
/// * `config` - Trigger configuration containing Discord parameters
120+
/// * `http_client` - HTTP client with middleware for retries
115121
///
116122
/// # Returns
117123
/// * `Result<Self, NotificationError>` - Notifier instance if config is Discord type
118-
pub fn from_config(config: &TriggerTypeConfig) -> Result<Self, NotificationError> {
124+
pub fn from_config(
125+
config: &TriggerTypeConfig,
126+
http_client: Arc<ClientWithMiddleware>,
127+
) -> Result<Self, NotificationError> {
119128
if let TriggerTypeConfig::Discord {
120129
discord_url,
121130
message,
131+
..
122132
} = config
123133
{
124134
let webhook_config = WebhookConfig {
@@ -133,7 +143,7 @@ impl DiscordNotifier {
133143
};
134144

135145
Ok(Self {
136-
inner: WebhookNotifier::new(webhook_config)?,
146+
inner: WebhookNotifier::new(webhook_config, http_client)?,
137147
})
138148
} else {
139149
let msg = format!("Invalid discord configuration: {:?}", config);
@@ -173,7 +183,10 @@ impl Notifier for DiscordNotifier {
173183

174184
#[cfg(test)]
175185
mod tests {
176-
use crate::models::{NotificationMessage, SecretString, SecretValue};
186+
use crate::{
187+
models::{NotificationMessage, SecretString, SecretValue},
188+
utils::{tests::create_test_http_client, HttpRetryConfig},
189+
};
177190

178191
use super::*;
179192

@@ -182,6 +195,7 @@ mod tests {
182195
"https://non-existent-url-discord-webhook.com".to_string(),
183196
"Alert".to_string(),
184197
body_template.to_string(),
198+
create_test_http_client(),
185199
)
186200
.unwrap()
187201
}
@@ -195,6 +209,7 @@ mod tests {
195209
title: "Test Alert".to_string(),
196210
body: "Test message ${value}".to_string(),
197211
},
212+
retry_policy: HttpRetryConfig::default(),
198213
}
199214
}
200215

@@ -242,8 +257,8 @@ mod tests {
242257
#[test]
243258
fn test_from_config_with_discord_config() {
244259
let config = create_test_discord_config();
245-
246-
let notifier = DiscordNotifier::from_config(&config);
260+
let http_client = create_test_http_client();
261+
let notifier = DiscordNotifier::from_config(&config, http_client);
247262
assert!(notifier.is_ok());
248263

249264
let notifier = notifier.unwrap();
@@ -252,24 +267,6 @@ mod tests {
252267
assert_eq!(notifier.inner.body_template, "Test message ${value}");
253268
}
254269

255-
#[test]
256-
fn test_from_config_invalid_type() {
257-
// Create a Slack config instead of Discord
258-
let config = TriggerTypeConfig::Slack {
259-
slack_url: SecretValue::Plain(SecretString::new("random.url".to_string())),
260-
message: NotificationMessage {
261-
title: "Test Slack".to_string(),
262-
body: "This is a test message".to_string(),
263-
},
264-
};
265-
266-
let notifier = DiscordNotifier::from_config(&config);
267-
assert!(notifier.is_err());
268-
269-
let error = notifier.unwrap_err();
270-
assert!(matches!(error, NotificationError::ConfigError { .. }));
271-
}
272-
273270
////////////////////////////////////////////////////////////
274271
// notify tests
275272
////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)