Skip to content
18 changes: 13 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ rust-version = "1.82"
[patch.crates-io]
esp-idf-sys = { git = "https://github.com/esp-rs/esp-idf-sys" }
esp-idf-hal = { git = "https://github.com/esp-rs/esp-idf-hal" }

[lib]
harness = false

[features]
default = ["std", "binstart"]

std = ["alloc", "log/std", "esp-idf-hal/std", "embedded-svc/std", "futures-io"]
std = [
"alloc",
"log/std",
"esp-idf-hal/std",
"embedded-svc/std",
"embedded-svc/mqtt_protocol_v5",
"futures-io",
]
embassy-time-driver = ["dep:embassy-time-driver", "embassy-time-queue-utils"]
alloc = ["esp-idf-hal/alloc", "embedded-svc/alloc", "uncased/alloc"]
nightly = ["embedded-svc/nightly", "esp-idf-hal/nightly"]
Expand Down Expand Up @@ -56,9 +62,11 @@ uncased = { version = "0.9.7", default-features = false }
embedded-hal-async = { version = "1", default-features = false }
embedded-svc = { version = "0.28", default-features = false }
esp-idf-hal = { version = "0.45", default-features = false }
embassy-time-driver = { version = "0.2.1", optional = true, features = ["tick-hz-1_000_000"] }
embassy-time-queue-utils = { version = "0.3", optional = true }
embassy-futures = "0.1.2"
embassy-time-driver = { version = "0.2", optional = true, features = [
"tick-hz-1_000_000",
] }
embassy-time-queue-utils = { version = "0.1", optional = true }
embassy-futures = "0.1"
embedded-storage = { version = "0.3", optional = true }
futures-io = { version = "0.3", optional = true }

Expand Down
3 changes: 3 additions & 0 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
//! MQTT is a lightweight publish/subscribe messaging protocol.

pub mod client;

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
pub mod client5;
203 changes: 194 additions & 9 deletions src/mqtt/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@
use core::ffi::c_void;
use core::fmt::Debug;
use core::{slice, time};
#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
use std::vec::Vec;

extern crate alloc;
use alloc::boxed::Box;
use alloc::sync::Arc;

use embedded_svc::mqtt::client::{asynch, Client, Connection, Enqueue, ErrorType, Publish};

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
use embedded_svc::mqtt::client5::{MessageMetadata, SubscribePropertyConfig, UserPropertyItem};

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
use embedded_svc::mqtt::client5::UserPropertyList;

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
use crate::mqtt::client5::EspUserPropertyList;

use crate::private::unblocker::Unblocker;
use crate::sys::*;

Expand All @@ -25,17 +36,31 @@ pub use embedded_svc::mqtt::client::{
#[allow(unused_imports)]
pub use super::*;

fn u8ptr_to_str<'a>(ptr: *const u8, len: usize) -> Option<&'a str> {
if ptr.is_null() || len == 0 {
return None;
}

// SAFETY: The pointer is assumed to be valid and the length is non-zero.
let slice: &'a [u8] = unsafe { core::slice::from_raw_parts(ptr, len) };
core::str::from_utf8(slice).ok()
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum MqttProtocolVersion {
V3_1,
V3_1_1,
#[cfg(esp_idf_mqtt_protocol_5)]
V5,
}

impl From<MqttProtocolVersion> for esp_mqtt_protocol_ver_t {
fn from(pv: MqttProtocolVersion) -> Self {
match pv {
MqttProtocolVersion::V3_1 => esp_mqtt_protocol_ver_t_MQTT_PROTOCOL_V_3_1,
MqttProtocolVersion::V3_1_1 => esp_mqtt_protocol_ver_t_MQTT_PROTOCOL_V_3_1_1,
#[cfg(esp_idf_mqtt_protocol_5)]
MqttProtocolVersion::V5 => esp_mqtt_protocol_ver_t_MQTT_PROTOCOL_V_5,
}
}
}
Expand Down Expand Up @@ -337,22 +362,24 @@ impl<'a> TryFrom<&'a MqttClientConfiguration<'a>>
}
}

struct UnsafeCallback<'a>(*mut Box<dyn FnMut(esp_mqtt_event_handle_t) + Send + 'a>);
pub(crate) struct UnsafeCallback<'a>(*mut Box<dyn FnMut(esp_mqtt_event_handle_t) + Send + 'a>);

impl<'a> UnsafeCallback<'a> {
fn from(boxed: &mut Box<Box<dyn FnMut(esp_mqtt_event_handle_t) + Send + 'a>>) -> Self {
pub(crate) fn from(
boxed: &mut Box<Box<dyn FnMut(esp_mqtt_event_handle_t) + Send + 'a>>,
) -> Self {
Self(boxed.as_mut())
}

unsafe fn from_ptr(ptr: *mut c_void) -> Self {
pub(crate) unsafe fn from_ptr(ptr: *mut c_void) -> Self {
Self(ptr as *mut _)
}

fn as_ptr(&self) -> *mut c_void {
pub(crate) fn as_ptr(&self) -> *mut c_void {
self.0 as *mut _
}

unsafe fn call(&self, data: esp_mqtt_event_handle_t) {
pub(crate) unsafe fn call(&self, data: esp_mqtt_event_handle_t) {
let reference = self.0.as_mut().unwrap();

(reference)(data);
Expand Down Expand Up @@ -518,10 +545,29 @@ impl<'a> EspMqttClient<'a> {
self.subscribe_cstr(to_cstring_arg(topic)?.as_c_str(), qos)
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
pub fn subscribe_with_config<'ab>(
&mut self,
topic: &str,
qos: QoS,
config: SubscribePropertyConfig<'ab>,
) -> Result<MessageId, EspError> {
self.subscribe_with_config_cstr(to_cstring_arg(topic)?.as_c_str(), qos, config)
}

pub fn unsubscribe(&mut self, topic: &str) -> Result<MessageId, EspError> {
self.unsubscribe_cstr(to_cstring_arg(topic)?.as_c_str())
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
pub fn unsubscribe_with_config<'ab>(
&mut self,
topic: &str,
config: SubscribePropertyConfig<'ab>,
Comment on lines +562 to +566
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter type mismatch: The parameter 'config' should be of type 'UnsubscribePropertyConfig' not 'SubscribePropertyConfig'. This appears to be a copy-paste error from the subscribe_with_config method.

Copilot uses AI. Check for mistakes.
) -> Result<MessageId, EspError> {
self.unsubscribe_with_config_cstr(to_cstring_arg(topic)?.as_c_str(), config)
}

pub fn publish(
&mut self,
topic: &str,
Expand Down Expand Up @@ -576,10 +622,61 @@ impl<'a> EspMqttClient<'a> {
res
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
pub fn subscribe_with_config_cstr(
&mut self,
topic: &core::ffi::CStr,
qos: QoS,
config: SubscribePropertyConfig<'_>,
) -> Result<MessageId, EspError> {
let property = esp_mqtt5_subscribe_property_config_t {
subscribe_id: config.subscribe_id,
no_local_flag: config.no_local,
retain_as_published_flag: config.retain_as_published,
retain_handle: config.retain_handling,
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect cast: The retain_handling field is being cast with 'as _' without proper type checking. The field should match the expected C enum type. Consider using explicit type conversion or validating the value.

Copilot uses AI. Check for mistakes.
is_share_subscribe: config.share_name.is_some(),
share_name: config.share_name.map_or(core::ptr::null(), |s| s.as_ptr()),
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential use-after-free: The share_name pointer is obtained from calling as_ptr() on a temporary Option value. This pointer will become invalid once the expression completes. Consider using proper C string lifetime management with RawCstrs or ensuring the string data outlives the struct.

Copilot uses AI. Check for mistakes.
user_property: if let Some(ref user_properties) = config.user_properties {
EspUserPropertyList::from(user_properties).as_ptr()
} else {
mqtt5_user_property_handle_t::default()
},
Comment on lines +639 to +643
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: EspUserPropertyList::from creates a new user property list that is never properly cleaned up. The handle returned by as_ptr() will have its memory leaked after the property struct is dropped, as the Drop implementation only clears user properties if they are stored in the Option field.

Copilot uses AI. Check for mistakes.
};

Self::check(unsafe {
esp_mqtt5_client_set_subscribe_property(self.raw_client, &property as *const _)
})?;

self.subscribe_cstr(topic, qos)
}

pub fn unsubscribe_cstr(&mut self, topic: &core::ffi::CStr) -> Result<MessageId, EspError> {
Self::check(unsafe { esp_mqtt_client_unsubscribe(self.raw_client, topic.as_ptr()) })
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
pub fn unsubscribe_with_config_cstr<'ab>(
&mut self,
topic: &core::ffi::CStr,
config: SubscribePropertyConfig<'ab>,
Comment on lines +657 to +661
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter type mismatch: The parameter 'config' should be of type 'UnsubscribePropertyConfig' not 'SubscribePropertyConfig'. This is creating an incorrect property configuration for unsubscribe operations.

Copilot uses AI. Check for mistakes.
) -> Result<MessageId, EspError> {
let property = esp_mqtt5_unsubscribe_property_config_t {
is_share_subscribe: config.share_name.is_some(),
share_name: config.share_name.map_or(core::ptr::null(), |s| s.as_ptr()),
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential use-after-free: The share_name pointer is obtained from calling as_ptr() on a temporary Option value. This pointer will become invalid once the expression completes. Consider using proper C string lifetime management.

Copilot uses AI. Check for mistakes.
user_property: if let Some(ref user_properties) = config.user_properties {
EspUserPropertyList::from(user_properties).as_ptr()
} else {
mqtt5_user_property_handle_t::default()
},
Comment on lines +666 to +670
Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: EspUserPropertyList::from creates a new user property list that is never properly cleaned up. The handle returned by as_ptr() will have its memory leaked after the function returns.

Copilot uses AI. Check for mistakes.
};

Self::check(unsafe {
esp_mqtt5_client_set_unsubscribe_property(self.raw_client, &property as *const _)
})?;

self.unsubscribe_cstr(topic)
}

pub fn publish_cstr(
&mut self,
topic: &core::ffi::CStr,
Expand Down Expand Up @@ -674,7 +771,7 @@ impl ErrorType for EspMqttClient<'_> {
type Error = EspError;
}

impl Client for EspMqttClient<'_> {
impl<'a> Client for EspMqttClient<'a> {
fn subscribe(&mut self, topic: &str, qos: QoS) -> Result<MessageId, Self::Error> {
EspMqttClient::subscribe(self, topic, qos)
}
Expand Down Expand Up @@ -711,8 +808,8 @@ impl Enqueue for EspMqttClient<'_> {
unsafe impl Send for EspMqttClient<'_> {}

pub struct EspMqttConnection {
receiver: Receiver<EspMqttEvent<'static>>,
given: bool,
pub(crate) receiver: Receiver<EspMqttEvent<'static>>,
pub(crate) given: bool,
}

impl EspMqttConnection {
Expand Down Expand Up @@ -926,6 +1023,7 @@ impl EspAsyncMqttClient {
AsyncCommand::Subscribe { qos } => {
let topic =
unsafe { core::ffi::CStr::from_bytes_with_nul_unchecked(&work.topic) };

Copy link

Copilot AI Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary blank line added. Remove this blank line to maintain consistent code style.

Suggested change

Copilot uses AI. Check for mistakes.
work.result = client.subscribe_cstr(topic, qos);
}
AsyncCommand::Unsubscribe => {
Expand Down Expand Up @@ -1014,7 +1112,7 @@ static ERROR: EspError = EspError::from_infallible::<ESP_FAIL>();
pub struct EspMqttEvent<'a>(&'a esp_mqtt_event_t);

impl<'a> EspMqttEvent<'a> {
const fn new(event: &'a esp_mqtt_event_t) -> Self {
pub(crate) const fn new(event: &'a esp_mqtt_event_t) -> Self {
Self(event)
}

Expand Down Expand Up @@ -1083,6 +1181,83 @@ impl<'a> EspMqttEvent<'a> {
other => panic!("Unknown message type: {other}"),
}
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
pub fn metadata<'ab>(&self) -> Option<MessageMetadata<'ab>> {
let ptr = self.0.property;

if ptr.is_null() {
return None;
}

let payload_format_indicator = unsafe { (*ptr).payload_format_indicator };
let response_topic: Option<&'ab str> = unsafe {
let topic = (*ptr).response_topic;
let len = (*ptr).response_topic_len;
u8ptr_to_str(topic, len as _)
};

let correlation_data: Option<&'ab [u8]> = unsafe {
let data = (*ptr).correlation_data;
if data.is_null() {
None
} else {
Some(core::slice::from_raw_parts(
data,
(*ptr).correlation_data_len as usize,
))
}
};

let content_type: Option<&'ab str> = unsafe {
let content_type = (*ptr).content_type;
let len = (*ptr).content_type_len;
u8ptr_to_str(content_type, len as _)
};

let subscribe_id = unsafe { (*ptr).subscribe_id };

let event_property = MessageMetadata::new(
payload_format_indicator,
response_topic,
correlation_data,
content_type,
subscribe_id,
);
Some(event_property)
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
fn user_properties<'ab>(&self) -> Result<Vec<UserPropertyItem<'ab>>, EspError> {
let count = self.user_properties_count();
if count == 0 {
return Ok(Vec::new());
}

let ptr = self.0.property;
if ptr.is_null() {
return Ok(Vec::new());
}
let table: *mut mqtt5_user_property_list_t = unsafe { (*ptr).user_property };
if table.is_null() {
return Ok(Vec::new());
}

Ok(EspUserPropertyList::from_handle(table).get_items()?)
}

fn user_properties_count(&self) -> u8 {
let ptr = self.0.property;
if ptr.is_null() {
return 0;
}
let table: *mut mqtt5_user_property_list_t = unsafe { (*ptr).user_property };
if table.is_null() {
return 0;
}
let table = EspUserPropertyList::from_handle(table);
table.count()
}
}

/// SAFETY: EspMqttEvent contains no thread-specific data.
Expand All @@ -1099,4 +1274,14 @@ impl Event for EspMqttEvent<'_> {
fn payload(&self) -> EventPayload<'_, Self::Error> {
EspMqttEvent::payload(self)
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
fn metadata<'a>(&self) -> Option<MessageMetadata<'a>> {
EspMqttEvent::metadata(self)
}

#[cfg(all(esp_idf_mqtt_protocol_5, feature = "std"))]
fn user_properties<'ab>(&self) -> Result<Vec<UserPropertyItem<'ab>>, Self::Error> {
EspMqttEvent::user_properties(self)
}
}
Loading
Loading