Skip to content

Commit

Permalink
* MqttSubscription PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
psobiech committed Dec 29, 2023
1 parent 42da518 commit df315f9
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 49 deletions.
30 changes: 23 additions & 7 deletions MQTT.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Generate Certificates

```
```bash
mkdir -p mqtt/config mqtt/data mqtt/log
cd mqtt
git clone https://github.com/fcgdam/easy-ca.git
Expand All @@ -15,7 +15,8 @@ cd ../config/certs
# Configure mosquitto

## ./mqtt/config/mosquitto.conf
```

```properties
persistence true
persistence_location /mosquitto/data/
log_dest stdout
Expand Down Expand Up @@ -43,7 +44,8 @@ tls_version tlsv1.2
```

## ./mqtt/docker-compose.yml
```

```dockerfile
version: '3.8'

services:
Expand All @@ -65,7 +67,8 @@ networks:
```

## Run MQTT broker
```

```bash
❯ docker-compose up
[+] Running 1/0
✔ Container mqtt-mosquitto-1 Created 0.0s
Expand All @@ -80,16 +83,29 @@ mosquitto-1 | 1703839960: mosquitto version 2.0.18 running

## Test MQTT

```
```bash
mosquitto_pub --cafile ./mqtt/config/certs/ca/ca.crt -h localhost -t "topic" -m "message" -p 8883 -d --cert ./mqtt/config/certs/certs/user1.client.crt --key ./mqtt/config/certs/private/user1.client.key
```

# Configure VCLU

Copy certificates and CLU private key into CLU runtime directory
```

```bash
cp ./mqtt/config/certs/ca/ca.crt ./runtime/root/a/MQTT-ROOT.CRT``
cp ./mqtt/config/certs/certs/clu0.client.crt ./runtime/root/a/MQTT-PUBLIC.CRT
cp ./mqtt/config/certs/private/clu0.client.key ./runtime/root/a/MQTT-PRIVATE.PEM
```

Run VCLU and enable UseMQTT in OM.
Run VCLU and enable UseMQTT in OM.

## MqttSubscription

Example onMessage script:
```lua
-- read current message message
CLU0->AddToLog(CLU0->mqttTopic->Message)

-- unblock next message
CLU0->mqttTopic->NextMessage()
```
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,13 @@
</events>
</interface>
<objects>
<!-- <object maxInstances="-1" name="Timer" version="1"/>-->
<object maxInstances="-1" name="MqttSubscription" version="1"/>
<object maxInstances="-1" name="Timer" version="1"/>
<!-- <object maxInstances="-1" name="Calendar" version="1"/>-->
<!-- <object maxInstances="64" name="Scheduler" version="1"/>-->
<!-- <object maxInstances="64" name="SunriseSunsetCalendar" version="3"/>-->
<!-- <object maxInstances="64" name="PresenceSensor" version="2"/>-->
<!-- <object maxInstances="64" name="EventScheduler" version="1"/>-->
<!-- <object maxInstances="-1" name="Scheduler" version="1"/>-->
<!-- <object maxInstances="-1" name="SunriseSunsetCalendar" version="3"/>-->
<!-- <object maxInstances="-1" name="PresenceSensor" version="2"/>-->
<!-- <object maxInstances="-1" name="EventScheduler" version="1"/>-->
<!-- <object maxInstances="-1" name="HttpListener" version="2"/>-->
<!-- <object maxInstances="-1" name="HttpRequest" version="2"/>-->
</objects>
Expand All @@ -183,7 +184,7 @@
<option name="accept_tftp" value="true"/>
<option name="accept_udp" value="true"/>
<option name="accept_tcp" value="false"/>
<option name="maxObjects" value="9999"/>
<option name="maxObjects" value="400"/>
<option name="maxTfBusModules" value="0"/>
<option name="maxZwaveModules" value="0"/>
</options>
Expand Down
35 changes: 35 additions & 0 deletions runtime/device-interfaces/object_mqtt_subscription_v1.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<object class="300" name="MqttSubscription" version="1">
<features>
<feature get="true" index="0" name="Topic" set="true" type="str" unit="string">
<hint lang="pl" value="Topic"/>
<hint lang="en" value="Topic"/>
<desc resKey="mqttsubscription_topic"/>
</feature>
<feature get="true" index="1" name="Message" set="false" type="str" unit="string">
<hint lang="pl" value="Zawartość wiadomości MQTT"/>
<hint lang="en" value="The content of MQTT message"/>
<desc resKey="mqttsubscription_message"/>
</feature>
</features>
<methods>
<method call="set" index="0" name="SetTopic" return="void">
<param name="Topic" type="str" unit="string"/>
<hint lang="pl" value="Zmienia topic"/>
<hint lang="en" value="Changes topic"/>
<desc resKey="mqttsubscription_settopic"/>
</method>
<method call="execute" index="0" name="NextMessage" return="void">
<hint lang="pl" value="Oznacza wiadomość jako otrzymaną"/>
<hint lang="en" value="Marks message as received"/>
<desc resKey="mqttsubscription_nextmessage"/>
</method>
</methods>
<events>
<event address="0" name="OnMessage">
<hint lang="pl" value="Zdarzenie wywoływane w momencie otrzymania wiadomości"/>
<hint lang="en" value="Event occurring when the message is received"/>
<desc resKey="mqttsubscription_onmessage"/>
</event>
</events>
</object>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* OpenGr8on, open source extensions to systems based on Grenton devices
* Copyright (C) 2023 Piotr Sobiech
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package pl.psobiech.opengr8on.vclu;

import java.util.concurrent.LinkedBlockingDeque;

import org.luaj.vm2.LuaValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttSubscription extends VirtualObject {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttSubscription.class);

private final LinkedBlockingDeque<byte[]> messageQueue = new LinkedBlockingDeque<>();

public MqttSubscription(String name) {
super(name);

methodFunctions.put(0, this::onNextMessage); // mqttsubscription_nextmessage
}

private LuaValue onNextMessage(LuaValue arg1) {
featureValues.remove(1);

return LuaValue.NIL;
}

public String getTopic() {
return String.valueOf(featureValues.get(0).checkstring());
}

public void enqueueMessage(int id, byte[] payload) {
while (!messageQueue.offer(payload)) {
// TODO: retry/fail logic
Thread.yield();
}
}

@Override
public void loop() {
final LuaValue currentPayload = featureValues.get(1); // mqttsubscription_message
if (currentPayload == null) {
final byte[] payload = messageQueue.poll();
if (payload != null) {
featureValues.put(1, LuaValue.valueOf(new String(payload)));

triggerEvent(0); // mqttsubscription_onmessage
}
}
}

@Override
public void close() {
// NOP
}
}
24 changes: 12 additions & 12 deletions vclu/src/main/java/pl/psobiech/opengr8on/vclu/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@
import pl.psobiech.opengr8on.util.SocketUtil.Payload;
import pl.psobiech.opengr8on.util.SocketUtil.UDPSocket;
import pl.psobiech.opengr8on.util.ThreadUtil;
import pl.psobiech.opengr8on.vclu.Main.CluKeys;
import pl.psobiech.opengr8on.vclu.lua.LuaServer;
import pl.psobiech.opengr8on.vclu.lua.LuaServer.LuaThreadWrapper;
import pl.psobiech.opengr8on.vclu.Main.CluKeys;

public class Server implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
Expand Down Expand Up @@ -361,25 +361,25 @@ private Optional<Pair<CipherKey, Command>> onLuaScriptCommand(Payload payload) {
if (requestOptional.isPresent()) {
final LuaScriptCommand.Request request = requestOptional.get();

final LuaValue luaValue;
try {
// when having docker network interfaces,
// OM often picks incorrect/unreachable local address,
// so we need to also save real remote address from udp packet
String script = request.getScript();
if (script.startsWith(CLIENT_REGISTER_METHOD_PREFIX)) {
final String remoteAddress = payload.address().getHostAddress();
// when having docker network interfaces,
// OM often picks incorrect/unreachable local address,
// so we need to also save real remote address from udp packet
String script = request.getScript();
if (script.startsWith(CLIENT_REGISTER_METHOD_PREFIX)) {
final String remoteAddress = payload.address().getHostAddress();

script = CLIENT_REGISTER_METHOD_PREFIX + "\"" + remoteAddress + "\", " + script.substring(CLIENT_REGISTER_METHOD_PREFIX.length());
}
script = CLIENT_REGISTER_METHOD_PREFIX + "\"" + remoteAddress + "\", " + script.substring(CLIENT_REGISTER_METHOD_PREFIX.length());
}

LuaValue luaValue;
try {
luaValue = luaThread.globals()
.load("return %s".formatted(script))
.call();
} catch (LuaError e) {
LOGGER.error(e.getMessage(), e);

return sendError();
luaValue = LuaValue.NIL;
}

String returnValue;
Expand Down
51 changes: 39 additions & 12 deletions vclu/src/main/java/pl/psobiech/opengr8on/vclu/VirtualCLU.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -89,6 +91,8 @@ public class VirtualCLU extends VirtualObject implements Closeable {

private final ScheduledExecutorService executorService;

private final List<MqttSubscription> mqttSubscriptions = new LinkedList<>();

private volatile ZonedDateTime currentDateTime;

private MqttClient mqttClient;
Expand Down Expand Up @@ -118,17 +122,10 @@ public VirtualCLU(String name, Inet4Address address, Path aDriveDirectory) {
return VirtualCLU.this.featureValues.get(21);
}

final boolean mqttEnable = (arg1.isboolean() && arg1.checkboolean()) || (arg1.isint() && arg1.checkint() != 0);
final boolean mqttAlreadyEnabled = mqttClient != null;
if (mqttEnable ^ mqttAlreadyEnabled) {
disableMqtt();

if (mqttEnable) {
enableMqtt();
}
}

return LuaValue.valueOf(mqttEnable);
// Sometimes OM uses true/false and sometimes 0/1
return LuaValue.valueOf(
(arg1.isboolean() && arg1.checkboolean()) || (arg1.isint() && arg1.checkint() != 0)
);
});
featureFunctions.put(22, arg1 -> { // clu_mqttconnection
return LuaValue.valueOf(
Expand Down Expand Up @@ -160,6 +157,19 @@ public void setup() {
triggerEvent(0); // clu_oninit
}

@Override
public void loop() {
final boolean mqttEnable = VirtualCLU.this.featureValues.get(21).checkboolean();
final boolean mqttAlreadyEnabled = mqttClient != null;
if (mqttEnable ^ mqttAlreadyEnabled) {
disableMqtt();

if (mqttEnable) {
enableMqtt();
}
}
}

private LuaNumber getUptime(LuaValue arg1) {
return valueOf(
TimeUnit.MILLISECONDS.toSeconds(
Expand Down Expand Up @@ -296,6 +306,12 @@ public void connectionLost(Throwable throwable) {
@Override
public void messageArrived(String topic, MqttMessage message) {
LOGGER.info("MQTT messageArrived: {} / {}", topic, message);

for (MqttSubscription mqttSubscription : mqttSubscriptions) {
if (topic.startsWith(mqttSubscription.getTopic())) {
mqttSubscription.enqueueMessage(message.getId(), message.getPayload());
}
}
}

@Override
Expand All @@ -317,12 +333,23 @@ public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
);

mqttClient.connect(options);
mqttClient.subscribe("topic", 0);

for (MqttSubscription mqttSubscription : mqttSubscriptions) {
try {
mqttClient.subscribe(mqttSubscription.getTopic());
} catch (MqttException e) {
LOGGER.error(e.getMessage(), e);
}
}
} catch (MqttException e) {
throw new UnexpectedException(e);
}
}

public void addMqttSubscription(MqttSubscription mqttSubscription) {
mqttSubscriptions.add(mqttSubscription);
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;

import org.luaj.vm2.LuaFunction;
Expand All @@ -33,13 +34,13 @@ public class VirtualObject implements Closeable {

protected final String name;

protected final Map<Integer, LuaValue> featureValues = new HashMap<>();
protected final Map<Integer, LuaValue> featureValues = new Hashtable<>();

protected final Map<Integer, LuaOneArgFunction> featureFunctions = new HashMap<>();
protected final Map<Integer, LuaOneArgFunction> featureFunctions = new Hashtable<>();

protected final Map<Integer, LuaOneArgFunction> methodFunctions = new HashMap<>();
protected final Map<Integer, LuaOneArgFunction> methodFunctions = new Hashtable<>();

protected final Map<Integer, org.luaj.vm2.LuaFunction> eventFunctions = new HashMap<>();
protected final Map<Integer, org.luaj.vm2.LuaFunction> eventFunctions = new Hashtable<>();

public VirtualObject(String name) {
this.name = name;
Expand Down
Loading

0 comments on commit df315f9

Please sign in to comment.