Skip to content

Commit 3723a1e

Browse files
committed
update network protocol
1 parent c1fa88f commit 3723a1e

24 files changed

+540
-49
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/.idea/
22
/.gradle/
33
/build/
4+
/out/

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.4.0"
35+
rlibVersion = "9.5.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.1.8.RELEASE'
3838
springVersion = '5.1.6.RELEASE'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.ss.mqtt.broker.model;
2+
3+
public interface MqttPropertyConstants {
4+
5+
int MAXIMUM_PROTOCOL_PACKET_SIZE = 256 * 1024 * 1024;
6+
7+
long SESSION_EXPIRY_INTERVAL_DEFAULT = 0;
8+
long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFF;
9+
10+
int RECEIVE_MAXIMUM_DEFAULT = 0xFFFF;
11+
int RECEIVE_MAXIMUM_MIN = 1;
12+
int RECEIVE_MAXIMUM_MAX = RECEIVE_MAXIMUM_DEFAULT;
13+
14+
int MAXIMUM_PACKET_SIZE_DEFAULT = MAXIMUM_PROTOCOL_PACKET_SIZE;
15+
int MAXIMUM_PACKET_SIZE_MIN = 1;
16+
int MAXIMUM_PACKET_SIZE_MAX = MAXIMUM_PROTOCOL_PACKET_SIZE;
17+
18+
int TOPIC_ALIAS_MAXIMUM_DEFAULT = 0;
19+
}

src/main/java/com/ss/mqtt/broker/model/PacketProperty.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.ss.mqtt.broker.model;
22

3+
import com.ss.rlib.common.util.ClassUtils;
4+
import com.ss.rlib.common.util.ObjectUtils;
35
import lombok.Getter;
46
import org.jetbrains.annotations.NotNull;
7+
import org.jetbrains.annotations.Nullable;
58

69
import java.util.stream.Stream;
710

@@ -61,9 +64,19 @@ public enum PacketProperty {
6164

6265
private final @Getter byte id;
6366
private final @Getter PacketDataType dataType;
67+
private final Object defaultValue;
6468

6569
PacketProperty(int id, @NotNull PacketDataType dataType) {
70+
this(id, dataType, null);
71+
}
72+
73+
PacketProperty(int id, @NotNull PacketDataType dataType, @Nullable Object defaultValue) {
6674
this.id = (byte) id;
6775
this.dataType = dataType;
76+
this.defaultValue = defaultValue;
77+
}
78+
79+
public <T> @NotNull T getDefaultValue() {
80+
return ClassUtils.unsafeNNCast(ObjectUtils.notNull(defaultValue));
6881
}
6982
}

src/main/java/com/ss/mqtt/broker/network/MqttClient.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.network;
22

33
import com.ss.mqtt.broker.model.ConnectReasonCode;
4+
import com.ss.mqtt.broker.model.MqttPropertyConstants;
45
import com.ss.mqtt.broker.model.MqttVersion;
56
import com.ss.mqtt.broker.network.packet.factory.MqttPacketOutFactory;
67
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
@@ -11,6 +12,14 @@ public class MqttClient {
1112

1213
private final @Getter MqttConnection connection;
1314

15+
private volatile @Getter String clientId;
16+
private volatile @Getter String serverClientId;
17+
18+
private volatile @Getter long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
19+
private volatile @Getter int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
20+
private volatile @Getter int maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
21+
private volatile @Getter int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
22+
1423
private volatile MqttVersion mqttVersion;
1524

1625
public MqttClient(@NotNull MqttConnection connection) {
@@ -23,7 +32,14 @@ public void reject(@NotNull ConnectReasonCode returnCode) {
2332
}
2433

2534
public void onConnected(@NotNull ConnectInPacket connect) {
26-
this.mqttVersion = connect.getMqttVersion();
35+
mqttVersion = connect.getMqttVersion();
36+
sessionExpiryInterval = connect.getSessionExpiryInterval();
37+
receiveMax = connect.getReceiveMax();
38+
maximumPacketSize = connect.getMaximumPacketSize();
39+
clientId = connect.getClientId();
40+
serverClientId = connect.getClientId();
41+
topicAliasMaximum = connect.getTopicAliasMaximum();
42+
2743
connection.send(getPacketOutFactory().newConnectAck(this, ConnectReasonCode.SUCCESSFUL, false));
2844
}
2945

src/main/java/com/ss/mqtt/broker/network/packet/MqttPacketReader.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.ss.mqtt.broker.network.packet;
22

33
import com.ss.mqtt.broker.network.MqttConnection;
4-
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
5-
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
4+
import com.ss.mqtt.broker.network.packet.in.*;
65
import com.ss.mqtt.broker.util.MqttDataUtils;
76
import com.ss.rlib.common.function.ByteFunction;
87
import com.ss.rlib.common.util.NumberUtils;
@@ -22,7 +21,21 @@ public class MqttPacketReader extends AbstractPacketReader<MqttReadablePacket, M
2221

2322
private static final ByteFunction<MqttReadablePacket>[] PACKET_FACTORIES = ArrayFactory.toArray(
2423
null,
25-
ConnectInPacket::new
24+
ConnectInPacket::new,
25+
null,
26+
PublishInPacket::new,
27+
PublishAckInPacket::new,
28+
PublishReceivedInPacket::new,
29+
PublishReleaseInPacket::new,
30+
PublishCompleteInPacket::new,
31+
SubscribeInPacket::new,
32+
null,
33+
UnsubscribeInPacket::new,
34+
null,
35+
PingInPacket::new,
36+
null,
37+
DisconnectInPacket::new,
38+
AuthenticateInPacket::new
2639
);
2740

2841
public MqttPacketReader(
@@ -81,7 +94,7 @@ protected int readPacketLength(@NotNull ByteBuffer buffer) {
8194
) {
8295

8396
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901021
84-
var startByte = buffer.get(startPacketPosition);
97+
var startByte = Byte.toUnsignedInt(buffer.get(startPacketPosition));
8598
var type = NumberUtils.getHighByteBits(startByte);
8699
var info = NumberUtils.getLowByteBits(startByte);
87100

src/main/java/com/ss/mqtt/broker/network/packet/MqttPacketWriter.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,41 @@ protected boolean onBeforeWrite(
3333
@NotNull MqttWritablePacket packet,
3434
int expectedLength,
3535
int totalSize,
36-
@NotNull ByteBuffer buffer
36+
@NotNull ByteBuffer firstBuffer,
37+
@NotNull ByteBuffer secondBuffer
3738
) {
38-
buffer.put((byte) packet.getPacketTypeAndFlags());
39-
MqttDataUtils.writeMbi(expectedLength, buffer);
39+
firstBuffer.clear();
40+
secondBuffer.clear();
41+
return true;
42+
}
43+
44+
@Override
45+
protected boolean onWrite(
46+
@NotNull MqttWritablePacket packet,
47+
int expectedLength,
48+
int totalSize,
49+
@NotNull ByteBuffer firstBuffer,
50+
@NotNull ByteBuffer secondBuffer
51+
) {
52+
if (!packet.write(secondBuffer)) {
53+
return false;
54+
} else {
55+
secondBuffer.flip();
56+
return true;
57+
}
58+
}
59+
60+
@Override
61+
protected boolean onAfterWrite(
62+
@NotNull MqttWritablePacket packet,
63+
int expectedLength,
64+
int totalSize,
65+
@NotNull ByteBuffer firstBuffer,
66+
@NotNull ByteBuffer secondBuffer
67+
) {
68+
firstBuffer.put((byte) packet.getPacketTypeAndFlags());
69+
MqttDataUtils.writeMbi(secondBuffer.remaining(), firstBuffer);
70+
firstBuffer.put(secondBuffer).flip();
4071
return true;
4172
}
4273
}

src/main/java/com/ss/mqtt/broker/network/packet/PacketType.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ public enum PacketType {
1616
PING_REQUEST,
1717
PING_RESPONSE,
1818
DISCONNECT,
19-
AUTHENTICATE;
19+
AUTHENTICATE
2020
}

src/main/java/com/ss/mqtt/broker/network/packet/in/AuthenticateInPacket.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class AuthenticateInPacket extends MqttReadablePacket {
99

1010
private static final byte PACKET_TYPE = (byte) PacketType.AUTHENTICATE.ordinal();
1111

12-
protected AuthenticateInPacket(byte info) {
12+
public AuthenticateInPacket(byte info) {
1313
super(info);
1414
}
1515

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java

+71-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

33
import com.ss.mqtt.broker.model.ConnectReasonCode;
4+
import com.ss.mqtt.broker.model.MqttPropertyConstants;
45
import com.ss.mqtt.broker.model.MqttVersion;
56
import com.ss.mqtt.broker.model.PacketProperty;
67
import com.ss.mqtt.broker.network.MqttConnection;
78
import com.ss.mqtt.broker.network.packet.PacketType;
89
import com.ss.rlib.common.util.NumberUtils;
910
import lombok.Getter;
1011
import org.jetbrains.annotations.NotNull;
12+
import org.jetbrains.annotations.Nullable;
1113

1214
import java.nio.ByteBuffer;
1315
import java.util.EnumSet;
@@ -96,6 +98,7 @@ public class ConnectInPacket extends MqttReadablePacket {
9698
The User Property is allowed to appear multiple times to represent multiple name, value pairs. The same
9799
name is allowed to appear more than once
98100
*/
101+
// FIXME to do supporting
99102
PacketProperty.USER_PROPERTY,
100103
/*
101104
Followed by a UTF-8 Encoded String containing the name of the authentication method used for
@@ -190,9 +193,19 @@ public class ConnectInPacket extends MqttReadablePacket {
190193

191194
private @Getter byte[] willPayload;
192195

196+
// properties
197+
private @Nullable @Getter String authenticationMethod;
198+
private @Nullable @Getter byte[] authenticationData;
199+
200+
private @Getter long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
201+
private @Getter int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
202+
private @Getter int maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
203+
private @Getter int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
204+
private @Getter boolean requestResponseInformation = false;
205+
private @Getter boolean requestProblemInformation = false;
206+
193207
private @Getter int keepAlive;
194208
private @Getter int willQos;
195-
196209
private @Getter boolean willRetain;
197210
private @Getter boolean cleanStart;
198211

@@ -291,16 +304,70 @@ protected void readImpl(@NotNull MqttConnection connection, @NotNull ByteBuffer
291304

292305
@Override
293306
protected void applyProperty(@NotNull PacketProperty property, @NotNull byte[] value) {
294-
super.applyProperty(property, value);
307+
switch (property) {
308+
case AUTHENTICATION_DATA:
309+
authenticationData = value;
310+
break;
311+
default:
312+
unexpectedProperty(property);
313+
return;
314+
}
295315
}
296316

297317
@Override
298318
protected void applyProperty(@NotNull PacketProperty property, @NotNull String value) {
299-
super.applyProperty(property, value);
319+
switch (property) {
320+
case AUTHENTICATION_METHOD:
321+
authenticationMethod = value;
322+
break;
323+
default:
324+
unexpectedProperty(property);
325+
}
300326
}
301327

302328
@Override
303329
protected void applyProperty(@NotNull PacketProperty property, int value) {
304-
super.applyProperty(property, value);
330+
switch (property) {
331+
case RECEIVE_MAXIMUM:
332+
receiveMax = NumberUtils.validate(
333+
value,
334+
MqttPropertyConstants.RECEIVE_MAXIMUM_MIN,
335+
MqttPropertyConstants.RECEIVE_MAXIMUM_MAX
336+
);
337+
break;
338+
case TOPIC_ALIAS_MAXIMUM:
339+
topicAliasMaximum = value;
340+
break;
341+
case REQUEST_RESPONSE_INFORMATION:
342+
requestResponseInformation = value == 1;
343+
break;
344+
case REQUEST_PROBLEM_INFORMATION:
345+
requestProblemInformation = value == 1;
346+
break;
347+
default:
348+
unexpectedProperty(property);
349+
}
350+
}
351+
352+
@Override
353+
protected void applyProperty(@NotNull PacketProperty property, long value) {
354+
switch (property) {
355+
case SESSION_EXPIRY_INTERVAL:
356+
sessionExpiryInterval = NumberUtils.validate(
357+
value,
358+
0,
359+
MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_INFINITY
360+
);
361+
break;
362+
case MAXIMUM_PACKET_SIZE:
363+
maximumPacketSize = NumberUtils.validate(
364+
(int) value,
365+
MqttPropertyConstants.MAXIMUM_PACKET_SIZE_MIN,
366+
MqttPropertyConstants.MAXIMUM_PACKET_SIZE_MAX
367+
);
368+
break;
369+
default:
370+
unexpectedProperty(property);
371+
}
305372
}
306373
}

src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@ protected void readProperties(@NotNull ByteBuffer buffer, @NotNull Set<PacketPro
4848

4949
switch (property.getDataType()) {
5050
case BYTE:
51-
applyProperty(property, readByte(buffer));
51+
applyProperty(property, readUnsignedByte(buffer));
5252
break;
5353
case SHORT:
54-
applyProperty(property, readShort(buffer));
54+
applyProperty(property, readUnsignedShort(buffer));
5555
break;
5656
case INTEGER:
57-
applyProperty(property, readInt(buffer));
57+
applyProperty(property, readUnsignedInt(buffer));
5858
break;
5959
case MULTI_BYTE_INTEGER:
60-
applyProperty(property, (int) MqttDataUtils.readMbi(buffer));
60+
applyProperty(property, MqttDataUtils.readMbi(buffer));
6161
break;
6262
case UTF_8_STRING:
6363
applyProperty(property, readString(buffer));
@@ -78,14 +78,25 @@ protected void readProperties(@NotNull ByteBuffer buffer, @NotNull Set<PacketPro
7878
protected void applyProperty(@NotNull PacketProperty property, int value) {
7979
}
8080

81+
protected void applyProperty(@NotNull PacketProperty property, long value) {
82+
}
83+
8184
protected void applyProperty(@NotNull PacketProperty property, @NotNull String value) {
8285
}
8386

8487
protected void applyProperty(@NotNull PacketProperty property, @NotNull byte[] value) {
8588
}
8689

8790
protected int readUnsignedByte(@NotNull ByteBuffer buffer) {
88-
return NumberUtils.toUnsignedByte(buffer.get());
91+
return Byte.toUnsignedInt(buffer.get());
92+
}
93+
94+
protected int readUnsignedShort(@NotNull ByteBuffer buffer) {
95+
return Short.toUnsignedInt(buffer.getShort());
96+
}
97+
98+
protected long readUnsignedInt(@NotNull ByteBuffer buffer) {
99+
return Integer.toUnsignedLong(buffer.get());
89100
}
90101

91102
protected int readMsbLsbInt(@NotNull ByteBuffer buffer) {
@@ -139,4 +150,8 @@ protected int readMsbLsbInt(@NotNull ByteBuffer buffer, int min, int max) {
139150
buffer.get(data);
140151
return data;
141152
}
153+
154+
protected void unexpectedProperty(@NotNull PacketProperty property) {
155+
throw new IllegalArgumentException("Unsupported property: " + property);
156+
}
142157
}

src/main/java/com/ss/mqtt/broker/network/packet/in/PingInPacket.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class PingInPacket extends MqttReadablePacket {
99

1010
public static final byte PACKET_TYPE = (byte) PacketType.PING_REQUEST.ordinal();
1111

12-
protected PingInPacket(byte info) {
12+
public PingInPacket(byte info) {
1313
super(info);
1414
}
1515

0 commit comments

Comments
 (0)