Open
Description
I am building a Rust-based service using the ntex framework, and my goal is to notify specific WebSocket clients in real-time when a REST API endpoint is triggered. Below are the key features I want to achieve:
Features:
- WebSocket Clients: Clients connect to the
/ws/{machine_id}
endpoint, wheremachine_id
uniquely identifies each client. - REST API Trigger: A REST API endpoint (
/notify-machine
) will be called with amachine_id
and a message. This should notify the WebSocket client associated with thatmachine_id
in real time. - Shared State: I need to maintain an in-memory registry to map
machine_id
to WebSocket connections for efficient message delivery.
Current Setup:
- WebSocket Connection: Implemented a WebSocket service with heartbeat monitoring using
web::ws::WsSink
and state management. - REST API: The goal is to trigger message delivery to WebSocket clients from the REST API handler.
Below is a simplified version of my current implementation:

WebSocket Service:
async fn ws_service(
(sink, mut server, machine_id): (web::ws::WsSink, web::types::State<mpsc::UnboundedSender<ServerMessage>>, String)
) -> Result<impl Service<web::ws::Frame, Response = Option<web::ws::Message>, Error = io::Error>, web::Error> {
let state = Rc::new(RefCell::new(WsState { hb: Instant::now() }));
let (tx, rx) = oneshot::channel();
rt::spawn(heartbeat(state.clone(), sink, rx));
let service = fn_service(move |frame| {
let item = match frame {
web::ws::Frame::Ping(msg) => Some(web::ws::Message::Pong(msg)),
web::ws::Frame::Pong(_) => None,
web::ws::Frame::Text(text) => Some(web::ws::Message::Text(
String::from_utf8(Vec::from(text.as_ref())).unwrap().into(),
)),
_ => None,
};
ready(Ok(item))
});
Ok(chain(service).and_then(fn_shutdown(move || {
let _ = tx.send(());
})))
}
REST API and State Management:
async fn ws_index(
req: web::HttpRequest,
state: web::types::State<mpsc::UnboundedSender<ServerMessage>>,
path: web::types::Path<String>
) -> Result<web::HttpResponse, web::Error> {
let machine_id = path.clone();
let config = map_config(fn_factory_with_config(ws_service), move |cfg| {
(cfg, state.clone(), machine_id.clone())
});
web::ws::start(req, config.clone()).await
}
async fn start() -> UnboundedSender<ServerMessage> {
let (tx, mut rx) = mpsc::unbounded();
rt::Arbiter::new().exec_fn(move || {
rt::spawn(async move {
while let Some(msg) = rx.next().await {
info!("Received message: {:?}", msg);
}
rt::Arbiter::current().stop();
});
});
tx
}
#[ntex::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let state = start().await;
web::server(move || {
web::App::new()
.state(state.clone())
.wrap(web::middleware::Logger::default())
.service(web::resource("/ws/{machine_id}").route(web::get().to(ws_index)))
.service(notify_machine)
.service(health)
})
.bind("127.0.0.1:8080")?
.run()
.await
}
Key Issues:
- Mapping Machine ID to Connections: How do I efficiently map
machine_id
to WebSocket connections (WsSink
) for targeted communication? - Sending Messages: How do I send a message to a specific WebSocket client (
machine_id
) when the/notify-machine
REST API is triggered? - State Management: What is the best approach to maintain the state of connected clients and their associations with
machine_id
?
Desired Behavior:
- When a client connects to
/ws/{machine_id}
, the connection should be registered with itsmachine_id
. - When the
/notify-machine
endpoint receives a POST request with amachine_id
and a message, the corresponding WebSocket client should receive that message in real time. - The WebSocket clients are behind NAT, so I cannot use HTTP to reach them. WebSockets are the only viable solution for real-time communication.
What I’ve Tried:
I referred to [ntex examples](https://github.com/ntex-rs/examples), but did not help. I am unsure how to manually send messages to specific clients from the REST API. For instance, in this [example snippet](https://github.com/anchalshivank/mt-notify-service/blob/8ca284333eb44eeadd8568631b4e86d61e7b03a8/src/main.rs#L59), I see how to send responses, but I need help sending messages proactively to connected clients.
Any guidance, examples, or advice for achieving this functionality would be greatly appreciated!
Metadata
Metadata
Assignees
Labels
No labels