Skip to content

Commit a851f99

Browse files
committed
rust: add mqtt exporter
Fix minor comments. Create a collector str topic allowing 5 levels of routed str topic. Signed-off-by: Frédéric Gardes <[email protected]>
1 parent b703db0 commit a851f99

File tree

6 files changed

+1370
-512
lines changed

6 files changed

+1370
-512
lines changed

rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ thiserror = "2.0"
6060
threadpool = "1.8"
6161
flexi_logger = "0.31.2"
6262
rand = "0.9.2"
63+
tempfile = "3.21.0"
6364

6465
[dependencies.base64]
6566
version = "0.22"

rust/README.md

Lines changed: 160 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ libits-client
55
[![crates.io](https://img.shields.io/crates/v/libits-client)][2]
66

77
This crate provides IoT3 [MQTT][3] and [OpenTelemetry][4] generic clients and,
8-
on top of this an [ETSI][5] [Intelligent Transport System][6] messages implementation using [JSON][7]
8+
on top of this, an [ETSI][5] [Intelligent Transport System][6] messages implementation using [JSON][7]
99

1010
Examples
1111
--------
1212

1313
### Common environment
1414

15-
1. Let's be sure to have unrestricted access to _test.mosquitto.org_ (IPv4 and IPv6)
15+
1. Let's be sure to have unrestricted access to [test.mosquitto.org](https://test.mosquitto.org/) (IPv4 and IPv6)
1616
2. In a terminal, prepare a collector implementing the OpenTelemetry API, on localhost. If you don't have one,
1717
you may use an existing one, like using docker:
1818
```shell
1919
docker container run \
20+
--name jaeger \
2021
--rm \
2122
-p 16686:16686 \
2223
-p 4318:4318 \
@@ -146,7 +147,7 @@ INFO [libits::client::application::pipeline] starting MQTT publishing thread...
146147
...
147148
```
148149

149-
**Note: this example does not send any message so it has to be used with a sender example from the python
150+
**Note: this example does not send any message, so it has to be used with a sender example from the python
150151
implementation to work relevantly**
151152

152153
You can manually send an information and a stopped CAM message with the following commands:
@@ -159,7 +160,7 @@ docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.
159160
docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.org -p 8886 -t default/outQueue/v2x/cam/com_car_555/0/3/1/3/3/3/1/1/1/2/0/2/1/0/0/1/2/1/2/1/2/1 --tls-version tlsv1.2 --capath /etc/ssl/certs/ -m "{\"message_type\":\"cam\",\"origin\":\"self\",\"version\":\"2.2.0\",\"source_uuid\":\"com_car_555\",\"timestamp\":1742227617044,\"message\":{\"protocol_version\":1,\"station_id\":555,\"generation_delta_time\":64291,\"basic_container\":{\"station_type\":5,\"reference_position\":{\"latitude\":447753167,\"longitude\":-6518623,\"position_confidence_ellipse\":{\"semi_major\":10,\"semi_minor\":50,\"semi_major_orientation\":1},\"altitude\":{\"value\":14750,\"confidence\":1}}},\"high_frequency_container\":{\"basic_vehicle_container_high_frequency\":{\"heading\":{\"value\":1800,\"confidence\":2},\"speed\":{\"value\":0,\"confidence\":3},\"drive_direction\":0,\"vehicle_length\":{\"value\":40,\"confidence\":0},\"vehicle_width\":20,\"longitudinal_acceleration\":{\"value\":10,\"confidence\":2},\"curvature\":{\"value\":11,\"confidence\":4},\"curvature_calculation_mode\":0,\"yaw_rate\":{\"value\":562,\"confidence\":2}}}}}"
160161
```
161162

162-
NB: you can alse use the deprecated version 1.1.3 of a CAM message:
163+
NB: you can also use the deprecated version 1.1.3 of a CAM message:
163164

164165
```shell
165166
docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.org -p 8886 -t default/outQueue/v2x/cam/com_car_555/0/3/1/3/3/3/1/1/1/2/0/2/1/0/0/1/2/1/2/1/2/1 --tls-version tlsv1.2 --capath /etc/ssl/certs/ -m "{\"type\":\"cam\",\"origin\":\"self\",\"version\":\"1.1.3\",\"source_uuid\":\"com_car_555\",\"timestamp\":1742227617044,\"message\":{\"protocol_version\":1,\"station_id\":555,\"generation_delta_time\":64291,\"basic_container\":{\"station_type\":5,\"reference_position\":{\"latitude\":447753167,\"longitude\":-6518623,\"altitude\":14750},\"confidence\":{\"position_confidence_ellipse\":{\"semi_major_confidence\":10,\"semi_minor_confidence\":50,\"semi_major_orientation\":1},\"altitude\":1}},\"high_frequency_container\":{\"heading\":3601,\"speed\":0,\"longitudinal_acceleration\":161,\"drive_direction\":0,\"vehicle_length\":40,\"vehicle_width\":20,\"confidence\":{\"heading\":2,\"speed\":3,\"vehicle_length\":0}}}}"
@@ -206,19 +207,33 @@ This example subscribes to messages and sends it to an exporter.
206207
cargo run --example collector
207208
```
208209
209-
Logs are redirected to output:
210+
Logs are redirected to output. By default, no exporter is used:
210211
211212
```
212-
INFO [libits::client::configuration] Logger ready on stdout
213-
Transport: standard MQTT; TLS enabled
214-
INFO [libits::transport::mqtt::mqtt_router] Registered route for topic: #
215-
INFO [collector] Exporter stdout activated
216-
INFO [collector] Exporter file activated on /data/collector with switch each 10000 lines
213+
INFO [libits::client::logger] Logger ready on stdout
214+
INFO [libits::transport::mqtt] Transport: standard MQTT; TLS enabled
215+
INFO [collector] Receiver on ["#"]
216+
INFO [collector] Exporter stdout not configured: Could not found field 'stdout'
217+
INFO [collector] Exporter file not configured: Could not found field 'file'
218+
INFO [collector] Exporter mqtt not configured: Could not found field 'mqtt'
219+
INFO [collector] Exporter stdout deactivated
220+
INFO [collector] Exporter file deactivated
217221
INFO [collector] Exporter mqtt deactivated
222+
INFO [libits::transport::mqtt::mqtt_router] Registered route for topic: #
218223
...
219224
```
220225
221-
The stdout export prints it to the console, with the logs:
226+
If you want to use an exporter, you can configure it in the configuration file.
227+
228+
You can activate a `stdout` exporter to write the messages to the console:
229+
230+
```config
231+
[exporter]
232+
# optional, true to export the received messages to the console, default to false
233+
stdout = true
234+
```
235+
236+
The `stdout` exporter prints it to the console, with the logs:
222237
223238
```
224239
...
@@ -233,7 +248,18 @@ value
233248
...
234249
```
235250
236-
The file export saves it to a file, rotating and compressing each 10000 lines:
251+
You can activate a `file` exporter to write the messages to a file:
252+
253+
```config
254+
# optional, true to export the received messages to files, default to false
255+
file = true
256+
# optional, directory where to store the files, default to '/data/collector'
257+
#file_directory = "/data/collector"
258+
# optional, number of lines stored before the file is rotated, default to 10000
259+
#file_nb_line = 10000
260+
```
261+
262+
The `file` exporter saves it to a file, rotating and compressing each 10000 lines:
237263
238264
```shell
239265
cat /data/collector/*.log | wc -l && ls -lh /data/collector/
@@ -249,6 +275,128 @@ total 1,3M
249275
-rw-rw-r-- 1 user group 452K abr. 11 10:41 collector_20250411_104148_210.log
250276
```
251277
278+
You can activate a `mqtt` exporter to write the messages to a(nother) broker:
279+
280+
```config
281+
# optional, true to export the received messages to a mqtt broker, default to false
282+
mqtt = true
283+
# broker host to export to
284+
host = test.mosquitto.org
285+
# broker port is the port to export to
286+
port = 1883
287+
# true to use the TLS protocol
288+
use_tls = false
289+
# true to use the MQTT WebSocket protocol
290+
use_websocket = false
291+
# client id to provide at the connection
292+
client_id = com_app_its-exporter-1
293+
# optional, connection timeout
294+
#connection_timeout = 60
295+
# optional, ACL username
296+
#username = username
297+
# optional, ACL password
298+
#password = password
299+
# optional, list of topic level to update with its new value. e.g. "1=default","2=exporter"
300+
topic_level_update_list = "1=collector"
301+
```
302+
303+
The `mqtt` exporter copies the messages to `test.mosquitto.org`
304+
on the port `1883` without TLS neither webSocket,
305+
using the client id `com_app_its-exporter-1`
306+
and updating topic level 1 with the new value `collector`
307+
(to not loop here, we're using the same broker to receive and publish):
308+
309+
```shell
310+
docker container run -it --rm eclipse-mosquitto mosquitto_sub -h test.mosquitto.org -p 1883 -t "collector/#" -v
311+
```
312+
313+
```
314+
...
315+
collector/homey/shelly-powder-room-dimmer/measure-temperature "34.5"
316+
collector/saccal/em/serial "failed to read modbus !!!"
317+
collector/saccal/em/serial "Retrying 1 ..."
318+
collector/GarageTemperatures/fridgeHumidity "26.5"
319+
collector/GarageTemperatures/mqttTemperatureRec "4267"
320+
collector/GarageTemperatures/garageTemperature "55.9"
321+
collector/GarageTemperatures/garageHumidity "51.3"
322+
collector "23.8"
323+
...
324+
```
325+
326+
You can filter the reception of messages by topics:
327+
328+
```config
329+
[receiver]
330+
# optional, list of topic (with a comma ',' separator) to subscribe to, default to "#"
331+
topic_list = "test/topic1","test/topic2"
332+
# optional, topic level number to put together into the router
333+
#route_level = 1
334+
335+
[exporter]
336+
# optional, true to export the received messages to the console, default to false
337+
stdout = true
338+
```
339+
340+
The `stdout` exporter prints only the messages from the `test/topic1` and `test/topic2` topics:
341+
342+
```shell
343+
docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.org -p 8886 -t test/topic1 --capath /etc/ssl/certs/ -m "message of the topic 1"
344+
docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.org -p 8886 -t test/topic2 --capath /etc/ssl/certs/ -m "message of the topic 2"
345+
```
346+
347+
```
348+
INFO [libits::client::logger] Logger ready on stdout
349+
INFO [libits::transport::mqtt] Transport: standard MQTT; TLS enabled
350+
INFO [collector] Receiver on ["test/topic1", "test/topic2"]
351+
INFO [collector] Exporter file not configured: Could not found field 'file'
352+
INFO [collector] Exporter mqtt not configured: Could not found field 'mqtt'
353+
INFO [collector] Exporter stdout activated
354+
INFO [collector] Exporter file deactivated
355+
INFO [collector] Exporter mqtt deactivated
356+
INFO [libits::transport::mqtt::mqtt_router] Registered route for topic: test/topic1
357+
INFO [libits::transport::mqtt::mqtt_router] Registered route for topic: test/topic2
358+
message of the topic 1
359+
message of the topic 2
360+
```
361+
362+
You can filter the reception of messages by topics including wld cards (`+` and/or `#`)
363+
and indicate the topic level to put together into the router:
364+
365+
```config
366+
[receiver]
367+
# optional, list of topic (with a comma ',' separator) to subscribe to, default to "#"
368+
topic_list = "test/topic3/#","test/topic4/+/data"
369+
# optional, topic level number to put together into the router
370+
route_level = 2
371+
372+
[exporter]
373+
# optional, true to export the received messages to the console, default to false
374+
stdout = true
375+
```
376+
377+
The `stdout` exporter prints only the messages from the `test/topic3/#` and `test/topic4/+/data` topics
378+
and groups all the messages on two routes of level 2, so `test/topic3` or `test/topic4`:
379+
380+
```shell
381+
docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.org -p 8886 -t test/topic3/subinformation --capath /etc/ssl/certs/ -m "message of the topic 3"
382+
docker container run -it --rm eclipse-mosquitto mosquitto_pub -h test.mosquitto.org -p 8886 -t test/topic4/subinformation/data --capath /etc/ssl/certs/ -m "message of the topic 4"
383+
```
384+
385+
```
386+
INFO [libits::client::logger] Logger ready on stdout
387+
INFO [libits::transport::mqtt] Transport: standard MQTT; TLS enabled
388+
INFO [collector] Receiver on ["test/topic3/#", "test/topic4/+/data"] with the route level 2
389+
INFO [collector] Exporter file not configured: Could not found field 'file'
390+
INFO [collector] Exporter mqtt not configured: Could not found field 'mqtt'
391+
INFO [collector] Exporter stdout activated
392+
INFO [collector] Exporter file deactivated
393+
INFO [collector] Exporter mqtt deactivated
394+
INFO [libits::transport::mqtt::mqtt_router] Registered route for topic: test/topic3
395+
INFO [libits::transport::mqtt::mqtt_router] Registered route for topic: test/topic4
396+
message of the topic 3
397+
message of the topic 4
398+
```
399+
252400
If the `telemetry` features is enabled both message reception and publish are traced;
253401
it requires an OTLP collector as mentioned in the telemetry example section.
254402

0 commit comments

Comments
 (0)