Skip to content

Commit

Permalink
* Disable IPTOS_RELIABILITY
Browse files Browse the repository at this point in the history
* Revert global listener
  • Loading branch information
psobiech committed Jan 18, 2024
1 parent eb6e817 commit 6e35994
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 106 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
if: github.ref == 'refs/heads/develop'
uses: advanced-security/maven-dependency-submission-action@v3
docker:
needs: [testLinux, testWindows]
needs: [testLinux]
runs-on: ubuntu-latest
timeout-minutes: 30

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ public class SocketUtil {
*/
public static final int DEFAULT_TIMEOUT_MILLISECONDS = 9_000;

/**
* Reliability traffic flag value
*/
private static final int IPTOS_RELIABILITY = 0x04;

private SocketUtil() {
// NOP
}
Expand Down Expand Up @@ -121,8 +116,6 @@ public void open() {
try {
this.socket = new DatagramSocket(new InetSocketAddress(address, port));
this.socket.setSoTimeout(DEFAULT_TIMEOUT_MILLISECONDS);
this.socket.setTrafficClass(IPTOS_RELIABILITY);

this.socket.setBroadcast(broadcast);
} catch (IOException e) {
throw new UnexpectedException(e);
Expand Down
65 changes: 12 additions & 53 deletions modules/vclu/src/main/java/pl/psobiech/opengr8on/vclu/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class Server implements Closeable {

private static final long RETRY_DELAY = 100L;

private final ExecutorService executor = ThreadUtil.daemonExecutor("CLUServer");

private final DatagramPacket responsePacket = new DatagramPacket(new byte[BUFFER_SIZE], BUFFER_SIZE);

private final Path parentDirectory;
Expand All @@ -92,13 +94,11 @@ public class Server implements Closeable {

private final CLUDevice cluDevice;

protected final UDPSocket globalBroadcastSocket;

protected final UDPSocket broadcastSocket;

protected final UDPSocket commandSocket;

private final ExecutorService executor = ThreadUtil.daemonExecutor("CLUServer");
private final UDPSocket responseSocket;

private LuaThread mainThread;

Expand All @@ -117,26 +117,26 @@ public Server(Path rootDirectory, CipherKey projectCipherKey, NetworkInterfaceDt
rootDirectory, projectCipherKey, cluDevice,
// TODO: networkInterface.getBroadcastAddress() does not work with OM
SocketUtil.udpListener(IPv4AddressUtil.BROADCAST_ADDRESS, Client.COMMAND_PORT),
SocketUtil.udpListener(networkInterface.getBroadcastAddress(), Client.COMMAND_PORT),
SocketUtil.udpListener(cluDevice.getAddress(), Client.COMMAND_PORT),
SocketUtil.udpRandomPort(cluDevice.getAddress()),
new TFTPServer(cluDevice.getAddress(), TFTP.DEFAULT_PORT, ServerMode.GET_AND_REPLACE, rootDirectory)
);
}

protected Server(
Path rootDirectory, CipherKey projectCipherKey, CLUDevice cluDevice,
UDPSocket globalBroadcastSocket, UDPSocket broadcastSocket,
UDPSocket commandSocket,
UDPSocket broadcastSocket,
UDPSocket commandSocket, UDPSocket responseSocket,
TFTPServer tftpServer
) {
this.rootDirectory = rootDirectory.toAbsolutePath().normalize();
this.parentDirectory = rootDirectory.getParent();
this.aDriveDirectory = rootDirectory.resolve("a");
this.cluDevice = cluDevice;

this.globalBroadcastSocket = globalBroadcastSocket;
this.broadcastSocket = broadcastSocket;
this.commandSocket = commandSocket;
this.broadcastSocket = broadcastSocket;
this.commandSocket = commandSocket;
this.responseSocket = responseSocket;

this.tftpServer = tftpServer;

Expand All @@ -149,7 +149,7 @@ protected Server(
public void start() {
commandSocket.open();
broadcastSocket.open();
globalBroadcastSocket.open();
responseSocket.open();

executor.execute(() -> {
do {
Expand Down Expand Up @@ -188,43 +188,6 @@ public void start() {
}
);

executor.execute(() -> {
do {
try {
final UUID uuid = UUID.randomUUID();

final Optional<Request> requestOptional = awaitRequestPayload(
String.valueOf(uuid),
globalBroadcastSocket, Duration.ofMillis(TIMEOUT_MILLIS),
broadcastCipherKeys
);
if (requestOptional.isEmpty()) {
continue;
}

final Request request = requestOptional.get();
final Optional<Response> responseOptional = onBroadcastCommand(uuid, request);
if (responseOptional.isEmpty()) {
LOGGER.trace(
"%s\tIGNORED\t<-D--\t%s // %s"
.formatted(uuid, request.payload(), request.cipherKey())
);

continue;
}

respond(uuid, request, responseOptional.get());
} catch (UncheckedInterruptedException e) {
LOGGER.trace(e.getMessage(), e);

break;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
} while (!Thread.interrupted());
}
);

executor.execute(() -> {
do {
try {
Expand Down Expand Up @@ -717,11 +680,7 @@ protected void respond(String uuid, CipherKey cipherKey, Inet4Address ipAddress,
requestPayload.address(), requestPayload.port()
);

try (UDPSocket socket = SocketUtil.udpRandomPort(commandSocket.getLocalAddress())) {
socket.open();

socket.send(requestPacket);
}
responseSocket.send(requestPacket);
}

public CLUDevice getDevice() {
Expand All @@ -733,7 +692,7 @@ public void close() {
ThreadUtil.closeQuietly(executor);

IOUtil.closeQuietly(tftpServer, mqttClient, mainThread);
IOUtil.closeQuietly(commandSocket, broadcastSocket, globalBroadcastSocket);
IOUtil.closeQuietly(commandSocket, broadcastSocket, responseSocket);
}

private record Request(CipherKey cipherKey, Payload payload) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,10 @@ public String fetchValues(List<Subscription> subscriptions) {

@Override
public void close() {
forAllObjects(IOUtil::closeQuietly);
IOUtil.closeQuietly(clientRegistry);
IOUtil.closeQuietly(objectsByName.values());

ThreadUtil.closeQuietly(executor);

IOUtil.closeQuietly(clientRegistry);
}

public void forAllObjects(Consumer<VirtualObject> runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.luaj.vm2.LuaValue;
import org.slf4j.Logger;
Expand All @@ -43,6 +44,8 @@ public class RemoteCLU extends VirtualObject {

private final CLUClient client;

private Future<?> lastfuture = null;

public RemoteCLU(String name, Inet4Address address, Inet4Address localAddress, CipherKey cipherKey, int port) {
super(
name,
Expand All @@ -56,8 +59,16 @@ public RemoteCLU(String name, Inet4Address address, Inet4Address localAddress, C
register(Methods.EXECUTE, arg1 -> {
final String script = arg1.checkjstring();

if (lastfuture != null && !lastfuture.isDone()) {
try {
lastfuture.get();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

if (script.startsWith(LuaScriptCommand.SET_VARS)) {
executor.submit(() -> client.execute(script));
lastfuture = executor.submit(() -> client.execute(script));
} else {
final Optional<String> returnValueOptional;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public VirtualObject(
Class<? extends Enum<? extends IMethod>> methodClass,
Class<? extends Enum<? extends IEvent>> eventClass
) {
this.name = name;
this.name = name;
this.scheduler = ThreadUtil.virtualScheduler(name);

this.featureClass = featureClass;
Expand Down Expand Up @@ -249,7 +249,7 @@ public boolean triggerEvent(IEvent event) {
try {
LOGGER.debug("{}.triggerEvent({})", name, event.name());

scheduler.submit(() -> luaFunction.call());
scheduler.execute(luaFunction::call);

return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import io.jstach.jstachio.JStachio;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -58,14 +57,12 @@ public class MockServer implements Closeable {

private final UDPSocket broadcastSocket;

private final UDPSocket globalBroadcastSocket;
private final UDPSocket commandSocket;

private final UDPSocket socket;
private final UDPSocket responseSocket;

private final Server server;

private Future<Void> serverFuture;

public MockServer(CipherKey projectCipherKey, long serialNumber) throws Exception {
this(
projectCipherKey,
Expand All @@ -81,15 +78,15 @@ public MockServer(CipherKey projectCipherKey, long serialNumber) throws Exceptio
}

public MockServer(CipherKey projectCipherKey, CLUDevice cluDevice) throws Exception {
this.rootDirectory = FileUtil.temporaryDirectory();
this.rootDirectory = FileUtil.temporaryDirectory();
this.aDriveDirectory = rootDirectory.resolve("a");

FileUtil.mkdir(aDriveDirectory);

this.broadcastSocket = new UDPSocket(LOCALHOST, 0, false);
this.globalBroadcastSocket = new UDPSocket(LOCALHOST, 0, false);
this.socket = new UDPSocket(LOCALHOST, 0, false);
this.tftpServer = new TFTPServer(LOCALHOST, 0, ServerMode.GET_AND_REPLACE, rootDirectory);
this.broadcastSocket = new UDPSocket(LOCALHOST, 0, false);
this.commandSocket = new UDPSocket(LOCALHOST, 0, false);
this.responseSocket = new UDPSocket(LOCALHOST, 0, false);
this.tftpServer = new TFTPServer(LOCALHOST, 0, ServerMode.GET_AND_REPLACE, rootDirectory);

this.tftpServer.start();
this.tftpServer.stop();
Expand All @@ -110,12 +107,12 @@ public MockServer(CipherKey projectCipherKey, CLUDevice cluDevice) throws Except
rootDirectory,
projectCipherKey,
cluDevice,
broadcastSocket, globalBroadcastSocket, socket,
broadcastSocket, commandSocket, responseSocket,
tftpServer
);
}

public void start() throws InterruptedException {
public void start() {
server.start();
}

Expand Down Expand Up @@ -146,7 +143,7 @@ public Path getADriveDirectory() {
}

public int getPort() {
return socket.getLocalPort();
return commandSocket.getLocalPort();
}

public int getBroadcastPort() {
Expand All @@ -163,13 +160,6 @@ public Server getServer() {

@Override
public void close() {
ThreadUtil.cancel(serverFuture);
try {
serverFuture.get();
} catch (Exception e) {
//
}

ThreadUtil.closeQuietly(executor);

IOUtil.closeQuietly(server);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static void tearDown() throws Exception {
}

@Test
@Timeout(10)
@Timeout(30)
void remoteCommunication() throws Exception {
assertEquals(LuaValue.NIL, server1.getServer().luaCall("testVariable"));

Expand Down
13 changes: 1 addition & 12 deletions runtime/root/a/CONFIG.JSON
Original file line number Diff line number Diff line change
@@ -1,12 +1 @@
{
"sn": 0,
"mac": "0e:aa:55:aa:55:aa",
"hwType": 19,
"hwVer": 1,
"fwType": 3,
"fwApiVer": 11163050,
"fwVer": "0.0.0-VIRTUAL",
"status": "OK",
"tfbusDevices": [],
"zwaveDevices": []
}
{"sn":0,"mac":"8c:1d:96:ee:fc:2d","hwType":19,"hwVer":1,"fwType":3,"fwApiVer":11163050,"fwVer":"0.0.0-VIRTUAL","status":"OK","tfbusDevices":[],"firmwareVersionString":"0.0.0-VIRTUAL"}
Loading

0 comments on commit 6e35994

Please sign in to comment.