-
Notifications
You must be signed in to change notification settings - Fork 641
feat: support multiple connections for nats #3606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: support multiple connections for nats #3606
Conversation
👋 Hi VanyaGlazunov! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
WalkthroughThis change replaces the NATS environment variable NATS_SERVER with NATS_SERVERS across code, scripts, configs, and docs. Core runtime switches from single-server to multi-server configuration, updating APIs, parsing, validation, and connection logic. Related Rust components, tests, SLURM scripts, Helm/K8s manifests, ECS definitions, and documentation are updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as App (KV Router/Runtime)
participant NATS as NATS Servers [S1,S2,...]
Note over App: Resolve NATS_SERVERS from env<br/>(comma-separated)
App->>App: Parse/validate server URLs (Vec<String>)
App->>NATS: Connect using builder.servers(servers)
alt First reachable server
NATS-->>App: Connection established
App->>NATS: Create/ensure JetStream (replicas if set)
NATS-->>App: Stream/Consumer ready
Note over App,NATS: Normal publish/subscribe flow
else All servers unreachable
NATS-->>App: Connection error
App-->>App: Propagate/connect error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
docs/backends/vllm/multi-node.md (1)
41-41
: LGTM! Documentation updated correctly.The environment variable has been updated to
NATS_SERVERS
in the multi-node setup documentation, aligning with the new multi-server support.Optional: Consider adding a multi-server example.
While the single-server example is correct, it might be helpful to add an example showing how to specify multiple NATS servers for HA deployments, such as:
export NATS_SERVERS="nats://${HEAD_NODE_IP1}:4222,nats://${HEAD_NODE_IP2}:4222"This would help users understand the multi-server capability that this PR enables.
lib/llm/src/kv_router/subscriber.rs (1)
178-195
: Consider trimming whitespace when parsing server list.The multi-server parsing logic correctly splits on commas, but doesn't trim whitespace. This could cause issues if the environment variable contains spaces (e.g.,
"nats://server1:4222, nats://server2:4222"
).Consider applying this diff to make the parsing more robust:
- let nats_servers: Vec<String> = - std::env::var("NATS_SERVERS").unwrap_or_else(|_| "nats://localhost:4222".to_string()) - .split(',') - .map(|s| s.to_owned()) - .collect(); + let nats_servers: Vec<String> = + std::env::var("NATS_SERVERS").unwrap_or_else(|_| "nats://localhost:4222".to_string()) + .split(',') + .map(|s| s.trim().to_owned()) + .collect();lib/llm/src/kv_router/publisher.rs (1)
130-138
: Consider trimming whitespace when parsing server list.Same as in
subscriber.rs
, the parsing logic doesn't trim whitespace around server URLs. This could be made more robust.Apply this diff to add whitespace trimming:
- let nats_servers = - std::env::var("NATS_SERVERS").unwrap_or_else(|_| "nats://localhost:4222".to_string()) - .split(',') - .map(|s| s.to_owned()) - .collect(); + let nats_servers = + std::env::var("NATS_SERVERS").unwrap_or_else(|_| "nats://localhost:4222".to_string()) + .split(',') + .map(|s| s.trim().to_owned()) + .collect();Optional: Consider extracting this parsing logic into a helper function to reduce duplication between
publisher.rs
andsubscriber.rs
.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (27)
benchmarks/profiler/deploy/profile_sla_aic_job.yaml
(1 hunks)benchmarks/profiler/deploy/profile_sla_job.yaml
(1 hunks)benchmarks/profiler/deploy/profile_sla_moe_job.yaml
(1 hunks)components/backends/sglang/slurm_jobs/scripts/worker_setup.py
(1 hunks)components/backends/trtllm/multinode/srun_aggregated.sh
(2 hunks)components/backends/trtllm/multinode/srun_disaggregated.sh
(3 hunks)components/backends/trtllm/performance_sweeps/benchmark_agg.slurm
(4 hunks)components/backends/trtllm/performance_sweeps/benchmark_disagg.slurm
(2 hunks)deploy/cloud/operator/internal/dynamo/graph.go
(2 hunks)deploy/cloud/operator/internal/dynamo/graph_test.go
(10 hunks)deploy/helm/chart/templates/deployment.yaml
(1 hunks)deploy/helm/chart/templates/grove-podgangset.yaml
(1 hunks)deploy/inference-gateway/helm/dynamo-gaie/templates/dynamo-epp.yaml
(1 hunks)deploy/metrics/README.md
(1 hunks)docs/backends/vllm/multi-node.md
(1 hunks)docs/guides/metrics.md
(1 hunks)examples/basics/multinode/README.md
(2 hunks)examples/deployments/ECS/README.md
(2 hunks)examples/deployments/ECS/task_definition_frontend.json
(1 hunks)examples/deployments/ECS/task_definition_prefillworker.json
(1 hunks)examples/multimodal/README.md
(1 hunks)lib/bindings/python/tests/conftest.py
(2 hunks)lib/llm/src/kv_router/publisher.rs
(1 hunks)lib/llm/src/kv_router/subscriber.rs
(1 hunks)lib/runtime/src/transports/nats.rs
(18 hunks)tests/kvbm/test_determinism_agg.py
(1 hunks)tests/kvbm/test_determinism_disagg.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-06-05T01:46:15.509Z
Learnt from: GuanLuo
PR: ai-dynamo/dynamo#1371
File: examples/llm/benchmarks/vllm_multinode_setup.sh:18-25
Timestamp: 2025-06-05T01:46:15.509Z
Learning: In multi-node setups with head/worker architecture, the head node typically doesn't need environment variables pointing to its own services (like NATS_SERVER, ETCD_ENDPOINTS) because local processes can access them via localhost. Only worker nodes need these environment variables to connect to the head node's external IP address.
Applied to files:
components/backends/trtllm/performance_sweeps/benchmark_disagg.slurm
components/backends/sglang/slurm_jobs/scripts/worker_setup.py
components/backends/trtllm/multinode/srun_aggregated.sh
examples/basics/multinode/README.md
docs/backends/vllm/multi-node.md
components/backends/trtllm/performance_sweeps/benchmark_agg.slurm
components/backends/trtllm/multinode/srun_disaggregated.sh
examples/multimodal/README.md
📚 Learning: 2025-07-03T10:14:30.570Z
Learnt from: fsaady
PR: ai-dynamo/dynamo#1730
File: examples/sglang/slurm_jobs/scripts/worker_setup.py:230-244
Timestamp: 2025-07-03T10:14:30.570Z
Learning: In examples/sglang/slurm_jobs/scripts/worker_setup.py, background processes (like nats-server, etcd) are intentionally left running even if later processes fail. This design choice allows users to manually connect to nodes and debug issues without having to restart the entire SLURM job from scratch, providing operational flexibility for troubleshooting in cluster environments.
Applied to files:
components/backends/sglang/slurm_jobs/scripts/worker_setup.py
components/backends/trtllm/performance_sweeps/benchmark_agg.slurm
🧬 Code graph analysis (3)
lib/llm/src/kv_router/subscriber.rs (1)
lib/runtime/src/transports/nats.rs (5)
s
(527-527)s
(533-533)new_with_consumer
(491-509)builder
(56-58)builder
(311-313)
lib/llm/src/kv_router/publisher.rs (1)
lib/runtime/src/transports/nats.rs (3)
s
(527-527)s
(533-533)new_without_consumer
(470-487)
lib/runtime/src/transports/nats.rs (1)
lib/llm/src/common/checked_file.rs (1)
is_nats_url
(85-87)
🪛 LanguageTool
examples/deployments/ECS/README.md
[grammar] ~85-~85: There might be a mistake here.
Context: ...TS_SERVERS|Value|nats://IP_ADDRESS:4222| - Docker configuration Add sh,-c
in **En...
(QB_NEW_EN)
[grammar] ~106-~106: There might be a mistake here.
Context: ...the Container Overrides, use the IP for ETCD/NATS task for the ETCD_ENDPOINTS
...
(QB_NEW_EN)
[grammar] ~106-~106: There might be a mistake here.
Context: ...CD_ENDPOINTSand
NATS_SERVERS` values. - After the deployment, an aggregated serv...
(QB_NEW_EN)
[grammar] ~107-~107: There might be a mistake here.
Context: ... you can test it with scripts in step 6. 3. Dynamo PrefillWorker Task - For disaggre...
(QB_NEW_EN)
[grammar] ~110-~110: There might be a mistake here.
Context: ...the Container Overrides, use the IP for ETCD/NATS task for the ETCD_ENDPOINTS
...
(QB_NEW_EN)
🪛 markdownlint-cli2 (0.18.1)
examples/deployments/ECS/README.md
85-85: Tables should be surrounded by blank lines
(MD058, blanks-around-tables)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build and Test - dynamo
- GitHub Check: clippy (.)
- GitHub Check: clippy (lib/bindings/python)
- GitHub Check: clippy (launch/dynamo-run)
🔇 Additional comments (27)
docs/guides/metrics.md (1)
78-78
: LGTM! Documentation updated correctly.The diagram label has been updated to reflect the new
NATS_SERVERS
environment variable name, maintaining consistency with the codebase changes.examples/deployments/ECS/task_definition_prefillworker.json (1)
26-27
: LGTM! Environment variable renamed correctly.The environment variable has been updated to
NATS_SERVERS
, aligning with the PR objective to support multiple NATS connections for HA clusters.deploy/helm/chart/templates/deployment.yaml (1)
106-109
: LGTM! Helm template updated correctly.The environment variable name in the Helm template has been updated to
NATS_SERVERS
, maintaining consistency with the multi-server support changes across the codebase.examples/multimodal/README.md (1)
288-289
: LGTM! Documentation comment updated correctly.The comment has been updated to reference
NATS_SERVERS
instead ofNATS_SERVER
, correctly reflecting the environment variable that worker nodes need for distributed coordination.deploy/cloud/operator/internal/dynamo/graph.go (2)
656-686
: LGTM! Go code updated correctly.The
addStandardEnvVars
function has been updated to useNATS_SERVERS
instead ofNATS_SERVER
, maintaining consistency with the multi-server support changes across the codebase.
689-689
: LGTM! Documentation comment updated correctly.The function documentation has been updated to reference
NATS_SERVERS
, ensuring code comments stay in sync with the implementation changes.examples/deployments/ECS/task_definition_frontend.json (1)
34-35
: LGTM! ECS task definition updated correctly.The environment variable has been updated to
NATS_SERVERS
in the frontend task definition, maintaining consistency with the multi-server support implementation.components/backends/sglang/slurm_jobs/scripts/worker_setup.py (1)
386-394
: LGTM! Python script updated correctly.The
setup_env
function has been updated to useNATS_SERVERS
instead ofNATS_SERVER
for both the environment variable assignment and the corresponding log message, ensuring consistency throughout the worker setup process.deploy/helm/chart/templates/grove-podgangset.yaml (1)
99-99
: LGTM!The environment variable rename from
NATS_SERVER
toNATS_SERVERS
is consistent with the PR's goal to support multiple NATS connections for HA clusters.examples/basics/multinode/README.md (2)
129-129
: LGTM!The environment variable rename is correctly documented. The value format
nats://${INFRA_NODE_IP}:4222
remains unchanged, maintaining backward compatibility for existing deployments that will update to use the new variable name.
435-435
: LGTM!The NATS CLI command correctly references the updated
NATS_SERVERS
environment variable. The command syntax is valid.deploy/inference-gateway/helm/dynamo-gaie/templates/dynamo-epp.yaml (1)
85-86
: LGTM!The environment variable rename is correct and maintains the existing Helm template structure. The value correctly constructs the NATS URL using the platform name and namespace.
components/backends/trtllm/performance_sweeps/benchmark_disagg.slurm (2)
115-115
: LGTM!The environment variable is correctly renamed and exported with the appropriate NATS URL format.
145-145
: LGTM!The container environment variable propagation correctly includes
NATS_SERVERS
alongside other environment variables.components/backends/trtllm/performance_sweeps/benchmark_agg.slurm (2)
64-64
: LGTM!The environment variable is correctly renamed and exported with the appropriate NATS URL format.
94-94
: LGTM!All three
srun
invocations correctly propagate the renamedNATS_SERVERS
environment variable to containers, ensuring consistency across the frontend, worker, and loadgen processes.Also applies to: 107-107, 116-116
deploy/cloud/operator/internal/dynamo/graph_test.go (1)
1325-1326
: LGTM!All test expectations have been consistently updated to use
NATS_SERVERS
instead ofNATS_SERVER
. The test cases cover various scenarios (single-node, multinode, SGLang, vLLM, frontend, worker, planner), ensuring comprehensive validation of the environment variable rename.Also applies to: 1468-1469, 1833-1834, 1986-1987, 2117-2118, 2251-2252, 2632-2633, 2772-2773, 2903-2904, 3044-3045
examples/deployments/ECS/README.md (2)
85-85
: LGTM!The environment variable table correctly documents
NATS_SERVERS
with the appropriate value formatnats://IP_ADDRESS:4222
.
106-106
: LGTM!The deployment instructions correctly reference
NATS_SERVERS
in the container override steps, ensuring consistency with the updated environment variable name.Also applies to: 110-110
components/backends/trtllm/multinode/srun_aggregated.sh (2)
29-29
: LGTM!The environment variable is correctly renamed and exported with the appropriate NATS URL format.
66-66
: LGTM!The container environment variable propagation correctly includes
NATS_SERVERS
, ensuring the worker receives the updated environment variable name.tests/kvbm/test_determinism_disagg.py (1)
94-94
: LGTM!The environment variable rename from
NATS_SERVER
toNATS_SERVERS
aligns with the PR's objective to support multiple NATS connections. Using a single server value in tests is appropriate.benchmarks/profiler/deploy/profile_sla_job.yaml (1)
25-25
: LGTM!The environment variable name change from
NATS_SERVER
toNATS_SERVERS
is consistent with the multi-server support being added across the codebase.benchmarks/profiler/deploy/profile_sla_moe_job.yaml (1)
25-25
: LGTM!The environment variable rename is consistent with the PR's changes to support multiple NATS servers.
lib/bindings/python/tests/conftest.py (3)
143-143
: LGTM!The environment variable rename from
NATS_SERVER
toNATS_SERVERS
is consistent with the multi-server support being added.
149-149
: LGTM!Environment variable rename is consistent with the PR changes.
295-295
: LGTM!Environment variable rename is consistent with the PR changes.
fn default_server() -> Vec<String> { | ||
match std::env::var("NATS_SERVERS") { | ||
Ok(possible_list_of_urls) => possible_list_of_urls | ||
.split(',') | ||
.map(|s| s.to_string()) | ||
.collect(), | ||
Err(_) => vec!["nats://localhost:4222".to_string()], | ||
} | ||
|
||
"nats://localhost:4222".to_string() | ||
} | ||
|
||
fn validate_nats_server(server: &str) -> Result<(), ValidationError> { | ||
if server.starts_with("nats://") { | ||
Ok(()) | ||
} else { | ||
Err(ValidationError::new("server must start with 'nats://'")) | ||
fn validate_nats_server(servers: &Vec<String>) -> Result<(), ValidationError> { | ||
for server in servers { | ||
if !is_nats_url(&server) { | ||
return Err(ValidationError::new("server {} must start with 'nats://'")); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trim whitespace when parsing NATS_SERVERS
.
default_server()
splits on commas but never trims. A very common pattern is to declare NATS_SERVERS="nats://a:4222, nats://b:4222"
. The second entry retains the leading space, fails is_nats_url()
, and the builder rejects the otherwise valid configuration. Please trim/filter each entry before validating/passing to the client.
fn default_server() -> Vec<String> {
match std::env::var("NATS_SERVERS") {
- Ok(possible_list_of_urls) => possible_list_of_urls
- .split(',')
- .map(|s| s.to_string())
- .collect(),
+ Ok(possible_list_of_urls) => possible_list_of_urls
+ .split(',')
+ .map(|s| s.trim())
+ .filter(|s| !s.is_empty())
+ .map(|s| s.to_string())
+ .collect(),
Err(_) => vec!["nats://localhost:4222".to_string()],
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn default_server() -> Vec<String> { | |
match std::env::var("NATS_SERVERS") { | |
Ok(possible_list_of_urls) => possible_list_of_urls | |
.split(',') | |
.map(|s| s.to_string()) | |
.collect(), | |
Err(_) => vec!["nats://localhost:4222".to_string()], | |
} | |
"nats://localhost:4222".to_string() | |
} | |
fn validate_nats_server(server: &str) -> Result<(), ValidationError> { | |
if server.starts_with("nats://") { | |
Ok(()) | |
} else { | |
Err(ValidationError::new("server must start with 'nats://'")) | |
fn validate_nats_server(servers: &Vec<String>) -> Result<(), ValidationError> { | |
for server in servers { | |
if !is_nats_url(&server) { | |
return Err(ValidationError::new("server {} must start with 'nats://'")); | |
} | |
} | |
Ok(()) | |
} | |
fn default_server() -> Vec<String> { | |
match std::env::var("NATS_SERVERS") { | |
Ok(possible_list_of_urls) => possible_list_of_urls | |
.split(',') | |
.map(|s| s.trim()) | |
.filter(|s| !s.is_empty()) | |
.map(|s| s.to_string()) | |
.collect(), | |
Err(_) => vec!["nats://localhost:4222".to_string()], | |
} | |
} | |
fn validate_nats_server(servers: &Vec<String>) -> Result<(), ValidationError> { | |
for server in servers { | |
if !is_nats_url(&server) { | |
return Err(ValidationError::new("server {} must start with 'nats://'")); | |
} | |
} | |
Ok(()) | |
} |
🤖 Prompt for AI Agents
In lib/runtime/src/transports/nats.rs around lines 285 to 303, default_server()
splits the NATS_SERVERS string on commas but does not trim entries, so values
like " nats://b:4222" keep a leading space and fail is_nats_url(); update
default_server() to map each split entry through trim() and filter out empty
strings before collecting into Vec<String> (e.g., .split(',').map(|s|
s.trim()).filter(|s| !s.is_empty()).map(|s| s.to_string()).collect()); no change
to the function signature is needed, but ensure validate_nats_server still
checks the trimmed values (or accept &Vec<String> as-is) so entries pass
is_nats_url() correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works for etcd, so should work for nats too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for how ETCD_ENDPOINTS is parsed in the codebase
rg -n -C3 'ETCD_ENDPOINTS.*split' --type=rust
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Also search for any etcd endpoint parsing logic
rg -n -C5 'std::env::var.*ETCD' --type=rust | head -100
Length of output: 2230
@VanyaGlazunov, you're absolutely right. I can confirm that ETCD_ENDPOINTS
parsing in lib/runtime/src/transports/etcd.rs
(lines 515-521) uses the exact same pattern:
fn default_servers() -> Vec<String> {
match std::env::var("ETCD_ENDPOINTS") {
Ok(possible_list_of_urls) => possible_list_of_urls
.split(',')
.map(|s| s.to_string())
.collect(),
Err(_) => vec!["http://localhost:2379".to_string()],
}
}
No trimming there either. Since this is the established pattern in the codebase and works for etcd, the NATS implementation should maintain consistency. My suggestion was premature—thank you for the clarification!
✏️ Learnings added
Learnt from: VanyaGlazunov
PR: ai-dynamo/dynamo#3606
File: lib/runtime/src/transports/nats.rs:285-303
Timestamp: 2025-10-14T15:35:28.901Z
Learning: In the Dynamo codebase, when parsing comma-separated environment variables for connection endpoints (ETCD_ENDPOINTS, NATS_SERVERS), the established pattern is to split on commas without trimming whitespace. This pattern is used consistently across lib/runtime/src/transports/etcd.rs and lib/runtime/src/transports/nats.rs. Do not suggest adding .trim() to these parsing functions as it would deviate from the established codebase convention.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
@coderabbitai generate docstrings |
Warning Docstrings generation is disabled for your repository or organization. |
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain@VanyaGlazunov: I'll generate docstrings for the NATS implementation. Let me first examine the main file to identify what needs documentation. 🏁 Script executed:
Length of output: 47084 @VanyaGlazunov: I'll generate comprehensive docstrings for the NATS implementation. Here are the docstrings for the key components: /// NATS client with JetStream support
///
/// Provides a high-level interface for interacting with NATS servers, including:
/// - Connection management with automatic retry logic
/// - JetStream context for persistent messaging
/// - Object store operations for file upload/download
/// - Stream and consumer management
/// - Service discovery and monitoring
///
/// # Examples
///
/// ```no_run
/// use dynamo_runtime::transports::nats::Client;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let client = Client::builder()
/// .servers(vec!["nats://localhost:4222".to_string()])
/// .build()?
/// .connect()
/// .await?;
///
/// let streams = client.list_streams().await?;
/// println!("Available streams: {:?}", streams);
/// Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct Client {
client: client::Client,
js_ctx: jetstream::Context,
} /// Authentication configuration for NATS connections
///
/// Supports multiple authentication methods, prioritized in the following order:
/// 1. Username/Password (requires both `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD`)
/// 2. Token authentication via `NATS_AUTH_TOKEN`
/// 3. NKey authentication via `NATS_AUTH_NKEY`
/// 4. Credentials file via `NATS_AUTH_CREDENTIALS_FILE`
///
/// When no environment variables are set, defaults to username "user" and password "user".
///
/// # Examples
///
/// ```
/// use dynamo_runtime::transports::nats::NatsAuth;
///
/// // Token authentication
/// let auth = NatsAuth::Token("my-secret-token".to_string());
///
/// // Username/Password authentication
/// let auth = NatsAuth::UserPass("admin".to_string(), "password".to_string());
/// ```
#[derive(Clone, Eq, PartialEq)]
pub enum NatsAuth {
/// Username and password authentication
UserPass(String, String),
/// Token-based authentication
Token(String),
/// NKey-based authentication
NKey(String),
/// Credentials file path for authentication
CredentialsFile(PathBuf),
} /// Validates that all server URLs start with the NATS protocol prefix
///
/// # Arguments
///
/// * `servers` - List of server URLs to validate
///
/// # Returns
///
/// `Ok(())` if all servers are valid, `Err(ValidationError)` otherwise
///
/// # Examples
///
/// ```no_run
/// use dynamo_runtime::transports::nats::validate_nats_server;
///
/// let servers = vec!["nats://localhost:4222".to_string()];
/// assert!(validate_nats_server(&servers).is_ok());
///
/// let invalid_servers = vec!["http://localhost:4222".to_string()];
/// assert!(validate_nats_server(&invalid_servers).is_err());
/// ```
fn validate_nats_server(servers: &Vec<String>) -> Result<(), ValidationError> {
// ... existing implementation
} /// Retrieves the default NATS server list from the environment
///
/// Reads the `NATS_SERVERS` environment variable, expecting a comma-separated list
/// of server URLs. If the variable is not set, returns a single-element vector
/// with "nats://localhost:4222".
///
/// # Returns
///
/// Vector of NATS server URLs
///
/// # Examples
///
/// ```no_run
/// // With NATS_SERVERS="nats://server1:4222,nats://server2:4222"
/// let servers = default_server();
/// assert_eq!(servers.len(), 2);
/// ```
fn default_server() -> Vec<String> {
// ... existing implementation
} impl Client {
/// Retrieves stream state information
///
/// # Arguments
///
/// * `stream_name` - Name of the stream to query
///
/// # Returns
///
/// Current state of the stream including message counts, consumer counts, etc.
///
/// # Errors
///
/// Returns an error if the stream doesn't exist or cannot be queried
pub async fn stream_info(&self, stream_name: &str) -> Result<jetstream::stream::State> {
// ... existing implementation
}
/// Retrieves a JetStream stream by name
///
/// # Arguments
///
/// * `name` - Name of the stream to retrieve
///
/// # Returns
///
/// The stream object for further operations
///
/// # Errors
///
/// Returns an error if the stream doesn't exist or cannot be accessed
pub async fn get_stream(&self, name: &str) -> Result<jetstream::stream::Stream> {
// ... existing implementation
}
} /// A queue implementation using NATS JetStream for reliable message delivery
///
/// Provides both work queue and broadcast patterns:
/// - **Work Queue Pattern**: Multiple consumers share a single consumer group ("worker-group"),
/// and each message is delivered to only one consumer in the group. Use [`NatsQueue::new`].
/// - **Broadcast Pattern**: Each consumer has a unique consumer name and receives all messages
/// independently. Use [`NatsQueue::new_with_consumer`].
///
/// # Stream and Subject Naming
///
/// The queue automatically sanitizes stream names by converting them to slugs and creates
/// subjects with the pattern `{stream_name}.*`. Messages are published to `{stream_name}.queue`.
///
/// # Environment Variables
///
/// - `DYN_NATS_STREAM_MAX_AGE`: Maximum age for messages in seconds (default: 3600)
/// - `NATS_JETSTREAM_REPLICAS`: Number of replicas for the JetStream stream (default: 1)
///
/// # Examples
///
/// ```no_run
/// use dynamo_runtime::transports::nats::NatsQueue;
/// use tokio::time::Duration;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// // Work queue pattern
/// let mut queue = NatsQueue::new(
/// "my-stream".to_string(),
/// vec!["nats://localhost:4222".to_string()],
/// Duration::from_secs(30)
/// );
/// queue.connect().await?;
///
/// // Enqueue a task
/// queue.enqueue_task(b"task data".to_vec().into()).await?;
///
/// // Dequeue a task
/// if let Some(data) = queue.dequeue_task(None).await? {
/// println!("Received: {:?}", data);
/// }
///
/// Ok(())
/// }
/// ```
pub struct NatsQueue {
// ... existing fields
} impl NatsQueue {
/// Creates a new NatsQueue with the default "worker-group" consumer
///
/// This implements the work queue pattern where multiple consumers share
/// the same consumer group, and each message is delivered to only one consumer.
///
/// # Arguments
///
/// * `stream_name` - Name of the stream (will be sanitized to a valid slug)
/// * `nats_servers` - List of NATS server URLs to connect to
/// * `dequeue_timeout` - Default timeout for dequeue operations
///
/// # Returns
///
/// A new NatsQueue instance (not yet connected)
pub fn new(stream_name: String, nats_servers: Vec<String>, dequeue_timeout: time::Duration) -> Self {
// ... existing implementation
}
/// Creates a new NatsQueue without a consumer (publisher-only mode)
///
/// Use this when you only need to publish messages and don't need to consume.
/// Attempting to call dequeue methods will result in an error.
///
/// # Arguments
///
/// * `stream_name` - Name of the stream (will be sanitized to a valid slug)
/// * `nats_servers` - List of NATS server URLs to connect to
/// * `dequeue_timeout` - Default timeout for dequeue operations (unused for publisher-only)
///
/// # Returns
///
/// A new NatsQueue instance configured for publishing only
pub fn new_without_consumer(
stream_name: String,
nats_servers: Vec<String>,
dequeue_timeout: time::Duration,
) -> Self {
// ... existing implementation
}
/// Creates a new NatsQueue with a specific consumer name for broadcast pattern
///
/// This implements the broadcast pattern where each consumer with a unique name
/// receives all messages independently. Useful for fan-out scenarios where multiple
/// services need to process the same events.
///
/// # Arguments
///
/// * `stream_name` - Name of the stream (will be sanitized to a valid slug)
/// * `nats_servers` - List of NATS server URLs to connect to
/// * `dequeue_timeout` - Default timeout for dequeue operations
/// * `consumer_name` - Unique name for this consumer
///
/// # Returns
///
/// A new NatsQueue instance with a unique consumer
pub fn new_with_consumer(
stream_name: String,
nats_servers: Vec<String>,
dequeue_timeout: time::Duration,
consumer_name: String,
) -> Self {
// ... existing implementation
}
/// Connects to the NATS server and sets up the stream and consumer
///
/// Creates or retrieves the JetStream stream and consumer. If the stream already
/// exists, it will be reused. The consumer is created as a durable consumer that
/// persists across reconnections.
///
/// # Errors
///
/// Returns an error if:
/// - Connection to any NATS server fails
/// - JetStream is not available
/// - Stream or consumer creation fails
pub async fn connect(&mut self) -> Result<()> {
// ... existing implementation
}
/// Connects to the NATS server and optionally resets the stream
///
/// # Arguments
///
/// * `reset_stream` - If true, purges all existing messages from the stream
///
/// # Errors
///
/// Returns an error if connection or setup fails
pub async fn connect_with_reset(&mut self, reset_stream: bool) -> Result<()> {
// ... existing implementation
}
/// Ensures an active connection exists
///
/// If not connected, this will call [`Self::connect`]. Use this before operations
/// that require a connection if you're not sure whether the queue is connected.
///
/// # Errors
///
/// Returns an error if connection fails
pub async fn ensure_connection(&mut self) -> Result<()> {
// ... existing implementation
}
/// Closes the connection and cleans up resources
///
/// This closes the NATS client connection but does not delete the consumer
/// from the server. To permanently remove the consumer, use [`Self::shutdown`].
///
/// # Errors
///
/// Currently always returns `Ok(())`
pub async fn close(&mut self) -> Result<()> {
// ... existing implementation
}
/// Shuts down the consumer by deleting it from the stream and closing the connection
///
/// This permanently removes the consumer from the NATS server. If `consumer_name`
/// is provided, that specific consumer will be deleted instead of the current consumer.
/// This allows deletion of other consumers on the same stream.
///
/// # Arguments
///
/// * `consumer_name` - Optional name of a specific consumer to delete. If None,
/// deletes this queue's consumer.
///
/// # Errors
///
/// Returns an error if:
/// - Not connected to NATS
/// - Consumer deletion fails
///
/// # Examples
///
/// ```no_run
/// # use dynamo_runtime::transports::nats::NatsQueue;
/// # use tokio::time::Duration;
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// let mut queue = NatsQueue::new(
/// "my-stream".to_string(),
/// vec!["nats://localhost:4222".to_string()],
/// Duration::from_secs(30)
/// );
/// queue.connect().await?;
///
/// // Delete this consumer and close connection
/// queue.shutdown(None).await?;
///
/// // Or delete a specific consumer by name
/// queue.shutdown(Some("other-consumer".to_string())).await?;
/// # Ok(())
/// # }
/// ```
pub async fn shutdown(&mut self, consumer_name: Option<String>) -> Result<()> {
// ... existing implementation
}
/// Counts the number of consumers for the stream
///
/// # Returns
///
/// The total number of consumers attached to this stream
///
/// # Errors
///
/// Returns an error if not connected or stream info cannot be retrieved
pub async fn count_consumers(&mut self) -> Result<usize> {
// ... existing implementation
}
/// Lists all consumer names for the stream
///
/// # Returns
///
/// Vector of consumer names attached to this stream
///
/// # Errors
///
/// Returns an error if not connected or consumer list cannot be retrieved
pub async fn list_consumers(&mut self) -> Result<Vec<String>> {
// ... existing implementation
}
/// Enqueues a task using the provided data
///
/// Publishes the data to the JetStream stream with guaranteed delivery.
/// The message will be persisted until consumed and acknowledged.
///
/// # Arguments
///
/// * `task_data` - Raw bytes to enqueue
///
/// # Errors
///
/// Returns an error if:
/// - Not connected to NATS
/// - Publish operation fails
pub async fn enqueue_task(&mut self, task_data: Bytes) -> Result<()> {
// ... existing implementation
}
/// Dequeues and returns a task as raw bytes
///
/// Fetches the next available message from the stream and automatically
/// acknowledges it. If no message is available within the timeout period,
/// returns `None`.
///
/// # Arguments
///
/// * `timeout` - Optional timeout override. If None, uses the default timeout
/// specified during queue creation.
///
/// # Returns
///
/// `Some(Bytes)` if a message was received, `None` if timeout elapsed
///
/// # Errors
///
/// Returns an error if:
/// - Consumer is not initialized (publisher-only mode)
/// - Fetch or acknowledgment fails
pub async fn dequeue_task(&mut self, timeout: Option<time::Duration>) -> Result<Option<Bytes>> {
// ... existing implementation
}
/// Gets the number of messages pending for this consumer
///
/// Returns the count of messages that are in the stream but not yet
/// consumed by this specific consumer.
///
/// # Returns
///
/// Number of pending messages for this consumer
///
/// # Errors
///
/// Returns an error if:
/// - Not connected to NATS
/// - Consumer info cannot be retrieved
pub async fn get_queue_size(&mut self) -> Result<u64> {
// ... existing implementation
}
/// Gets the total number of messages currently in the stream
///
/// Returns the total count of messages stored in the stream, regardless
/// of whether they've been consumed by any consumer.
///
/// # Returns
///
/// Total number of messages in the stream
///
/// # Errors
///
/// Returns an error if:
/// - Not connected to NATS
/// - Stream info cannot be retrieved
pub async fn get_stream_messages(&mut self) -> Result<u64> {
// ... existing implementation
}
/// Purges messages from the stream up to (but not including) the specified sequence number
///
/// This permanently removes messages and affects all consumers of the stream.
/// The purge excludes the message at the specified sequence number itself.
///
/// # Arguments
///
/// * `sequence` - Sequence number to purge up to (exclusive)
///
/// # Errors
///
/// Returns an error if:
/// - Not connected to NATS
/// - Purge operation fails
///
/// # Examples
///
/// ```no_run
/// # use dynamo_runtime::transports::nats::NatsQueue;
/// # use tokio::time::Duration;
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let mut queue = NatsQueue::new(
/// # "my-stream".to_string(),
/// # vec!["nats://localhost:4222".to_string()],
/// # Duration::from_secs(30)
/// # );
/// # queue.connect().await?;
/// // Purge messages 1 and 2 (keeps message 3 and onwards)
/// queue.purge_up_to_sequence(3).await?;
/// # Ok(())
/// # }
/// ```
pub async fn purge_up_to_sequence(&self, sequence: u64) -> Result<()> {
// ... existing implementation
}
/// Purges messages from the stream up to the minimum acknowledged sequence across all consumers
///
/// This finds the lowest acknowledged sequence number across all consumers and purges
/// up to that point (inclusive). This is safe to call as it ensures no consumer will
/// lose unprocessed messages.
///
/// If no messages have been acknowledged by any consumer, no purge occurs.
///
/// # Errors
///
/// Returns an error if:
/// - Not connected to NATS
/// - Consumer or stream info cannot be retrieved
/// - Purge operation fails
///
/// # Examples
///
/// ```no_run
/// # use dynamo_runtime::transports::nats::NatsQueue;
/// # use tokio::time::Duration;
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let mut queue = NatsQueue::new(
/// # "my-stream".to_string(),
/// # vec!["nats://localhost:4222".to_string()],
/// # Duration::from_secs(30)
/// # );
/// # queue.connect().await?;
/// // Safely purge all messages that have been acknowledged by all consumers
/// queue.purge_acknowledged().await?;
/// # Ok(())
/// # }
/// ```
pub async fn purge_acknowledged(&mut self) -> Result<()> {
// ... existing implementation
}
} These docstrings follow Rust documentation conventions and provide:
You can copy and paste these directly into your code, replacing or enhancing the existing docstrings. |
0572b7b
to
482c15f
Compare
9cef902
to
fa9d505
Compare
Return to NATS_SERVER env name for backward compatibility |
…ER -> NATS_SERVERS Signed-off-by: Ivan Glazunov <[email protected]>
Signed-off-by: Ivan Glazunov <[email protected]>
Signed-off-by: Ivan Glazunov <[email protected]>
Signed-off-by: Ivan Glazunov <[email protected]>
fa9d505
to
cc02fb6
Compare
Overview:
Supporting multiple nats connections is useful for building HA clusters. This is done for etcd but not for nats for some reason. Both etcd and nats connections are implemented through external libraries and they support several endpoints, so it was not much work to implement it. Please tell me if this is unwanted and why
Details:
I changed NATS_SERVER -> NATS_SERVERS through out all project and add support for multiple endpoints in rust wrapper of nats. Also added env var NATS_JETSTREAM_REPLICAS to set number of replicas for jetstreams that is used by NatsQueue.
Where should the reviewer start?
Please start at nats wrapper in dynamo/lib/runtime/src/transports/nats.rs
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
None
Summary by CodeRabbit
New Features
Refactor
Documentation