-
Notifications
You must be signed in to change notification settings - Fork 280
Open
Description
I start the consumer, then start producer, but the consumer didn't receive anything.
I tried mqttx client, consumer could reveive messages.
https://mqttx.app/downloads
producer.rs
use rumqttc::{Client, MqttOptions, QoS};
use tokio::{task, time};
use std::time::Duration;
#[tokio::main]
async fn main() {
// 1. 配置 MQTT 连接参数
let mut mqttoptions = MqttOptions::new("producer_1", "172.17.0.7", 1883);
mqttoptions
.set_keep_alive(Duration::from_secs(1))
.set_clean_session(false);
// 2. 创建客户端并连接
let (client, mut eventloop) = Client::new(mqttoptions, 15);
// 3. 发布消息
task::spawn(async move {
let mut i: i32 = 10;
loop {
println!("Sending message {}", i);
client.publish("hello/rumqtt", QoS::AtMostOnce, false, vec![10; 10 as usize]).unwrap();
time::sleep(Duration::from_secs(1)).await;
i += 1;
}
});
// 保持主线程运行
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
consumer.rs
use rumqttc::{AsyncClient, MqttOptions, QoS, Event, Incoming};
use tokio::task;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 1. 配置 MQTT 连接参数
let mut mqttoptions = MqttOptions::new("consumer_1", "172.17.0.7", 1883);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_clean_session(true);
// 2. 创建客户端并连接
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
// 3. 订阅主题
client.subscribe("hello/rumqtt", QoS::AtLeastOnce)
.await
.expect("Failed to subscribe");
println!("Subscribed to topic");
// 4. 异步处理消息接收
task::spawn(async move {
loop {
match eventloop.poll().await {
Ok(notification) => {
match notification {
Event::Incoming(Incoming::Publish(packet)) => {
// 处理接收到的消息
let payload = String::from_utf8_lossy(&packet.payload);
println!(
"Received message:\nTopic: {}\nPayload: {}\nQoS: {:?}",
packet.topic, payload, packet.qos
);
},
_ => {} // 其他事件暂不处理
}
}
Err(e) => {
eprintln!("Connection error: {:?}", e);
break;
}
}
}
});
// 保持主线程运行
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
Cargo.toml
[package]
name = "rust_rmqtt"
version = "0.1.0"
edition = "2021"
[dependencies]
# rmqtt = "0.0.1"
rumqttc = "0.24.0"
tokio = { version = "1.27.0", features = ["full"] }
[[bin]]
name = "producer"
path = "src/bin/producer.rs"
[[bin]]
name = "consumer"
path = "src/bin/consumer.rs"
rmqtt docker
docker run -d \
--name rmqtt \
-p 1883:1883 \
-p 8883:8883 \
-p 11883:11883 \
-p 6060:6060 \
rmqtt/rmqtt:0.11.0

Metadata
Metadata
Assignees
Labels
No labels