Skip to content

Commit b313a81

Browse files
committed
增加mqtt相关代码逻辑
Update pom.xml 新增mqtt 新增mqtt组件 调整 提交 Update MessageHeaderWarp.java 提交warp 增加mqtt代码 提交代码 提交部分代码
1 parent 6b3785d commit b313a81

17 files changed

+1061
-75
lines changed

arex-instrumentation/mq/arex-integration-mqtt/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>arex-instrumentation-parent</artifactId>
77
<groupId>io.arex</groupId>
8-
<version>0.2.0</version>
8+
<version>${revision}</version>
99
<relativePath>../../pom.xml</relativePath>
1010
</parent>
1111
<modelVersion>4.0.0</modelVersion>
@@ -14,9 +14,9 @@
1414

1515
<dependencies>
1616
<dependency>
17-
<groupId>org.springframework.integration</groupId>
18-
<artifactId>spring-integration-mqtt</artifactId>
19-
<version>5.5.10</version>
17+
<groupId>org.springframework</groupId>
18+
<artifactId>spring-messaging</artifactId>
19+
<version>5.3.4</version>
2020
<scope>provided</scope>
2121
</dependency>
2222
</dependencies>
Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,83 @@
11
package io.arex.inst.mqtt;
22

3-
import io.arex.agent.bootstrap.model.Mocker;
4-
import io.arex.inst.runtime.util.MockUtils;
3+
import io.arex.agent.bootstrap.internal.Pair;
4+
import io.arex.agent.bootstrap.util.StringUtil;
5+
import io.arex.inst.mqtt.adapter.MessageAdapter;
6+
import io.arex.inst.runtime.config.Config;
7+
import io.arex.inst.runtime.listener.CaseEvent;
8+
import io.arex.inst.runtime.listener.CaseEventDispatcher;
9+
import io.arex.inst.runtime.listener.EventSource;
10+
import io.arex.inst.runtime.model.ArexConstants;
11+
import io.arex.inst.runtime.util.IgnoreUtils;
512

613
/**
714
* MQTTAdapterHelper
815
*/
916
public class MQTTAdapterHelper {
1017

11-
public static Mocker createMocker(String operationName) {
12-
Mocker mocker = MockUtils.createMqttConsumer(operationName);
13-
mocker.getTargetRequest().setType(Byte.class.getName());
14-
return mocker;
18+
public static final String PROCESSED_FLAG = "arex-processed-flag";
19+
20+
21+
public static <MC, Msg> Pair<MC, Msg> onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
22+
Msg msg = adapter.warpMessage(message);
23+
if (msg == null) {
24+
return null;
25+
}
26+
if (adapter.markProcessed(msg, PROCESSED_FLAG)) {
27+
return null;
28+
}
29+
MC mc = adapter.warpMC(messageChannel);
30+
if (mc == null){
31+
return null;
32+
}
33+
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
34+
if (shouldSkip(adapter, mc, msg)) {
35+
return null;
36+
}
37+
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
38+
String excludeMockTemplate = adapter.getHeader(mc, msg, ArexConstants.HEADER_EXCLUDE_MOCK);
39+
CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent(EventSource.of(caseId, excludeMockTemplate)));
40+
return Pair.of(mc,msg);
41+
}
42+
43+
44+
public static <MC, Msg> void onServiceExit(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message){
45+
Msg msg = adapter.warpMessage(message);
46+
MC mc = adapter.warpMC(messageChannel);
47+
if (msg == null || mc == null) {
48+
return;
49+
}
50+
adapter.removeHeader(mc,msg,PROCESSED_FLAG);
51+
new MessageQueueExtractor<>( mc, msg,adapter).execute();
52+
}
53+
54+
55+
56+
57+
private static<MC, Msg> boolean shouldSkip(MessageAdapter<MC, Msg> adapter,MC mc, Msg msg){
58+
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
59+
// Replay scene
60+
if (StringUtil.isNotEmpty(caseId)) {
61+
return Config.get().getBoolean("arex.disable.replay", false);
62+
}
63+
64+
String forceRecord = adapter.getHeader(mc, msg, ArexConstants.FORCE_RECORD);
65+
// Do not skip if header with arex-force-record=true
66+
if (Boolean.parseBoolean(forceRecord)) {
67+
return false;
68+
}
69+
// Skip if request header with arex-replay-warm-up=true
70+
if (Boolean.parseBoolean(adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP))) {
71+
return true;
72+
}
73+
String topic = adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP);
74+
if (StringUtil.isEmpty(topic)) {
75+
return false;
76+
}
77+
if (IgnoreUtils.ignoreOperation(topic)) {
78+
return true;
79+
}
80+
return Config.get().invalidRecord(topic);
1581
}
82+
1683
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.arex.inst.mqtt;
2+
3+
import io.arex.agent.bootstrap.model.Mocker;
4+
import io.arex.inst.mqtt.adapter.MessageAdapter;
5+
import io.arex.inst.runtime.context.ContextManager;
6+
import io.arex.inst.runtime.model.ArexConstants;
7+
import io.arex.inst.runtime.util.LogUtil;
8+
import io.arex.inst.runtime.util.MockUtils;
9+
import org.springframework.messaging.MessageHeaders;
10+
11+
import java.util.Base64;
12+
import java.util.Collections;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
16+
/**
17+
* @author : MentosL
18+
* @date : 2023/5/9 22:16
19+
*/
20+
public class MessageQueueExtractor<MC,Msg> {
21+
private final MC messageChannel;
22+
private final Msg message;
23+
private final MessageAdapter<MC,Msg> adapter;
24+
25+
26+
public MessageQueueExtractor(MC messageChannel, Msg message, MessageAdapter<MC, Msg> adapter) {
27+
this.messageChannel = messageChannel;
28+
this.message = message;
29+
this.adapter = adapter;
30+
}
31+
32+
33+
public void execute() {
34+
try {
35+
if (message == null || messageChannel == null || adapter == null) {
36+
return;
37+
}
38+
if (!ContextManager.needRecordOrReplay()) {
39+
return;
40+
}
41+
executeBeforeProcess();
42+
doExecute();
43+
executeAfterProcess();
44+
} catch (Exception e) {
45+
LogUtil.warn("MessageQueue.execute", e);
46+
}
47+
}
48+
49+
private void executeBeforeProcess() {
50+
if (ContextManager.needRecord()) {
51+
adapter.addHeader(messageChannel,message, ArexConstants.RECORD_ID,ContextManager.currentContext().getCaseId());
52+
}
53+
if (ContextManager.needReplay()) {
54+
adapter.addHeader(messageChannel,message, ArexConstants.REPLAY_ID,ContextManager.currentContext().getReplayId());
55+
}
56+
}
57+
private void executeAfterProcess(){
58+
// Think about other ways to replace the head
59+
adapter.resetMsg(message);
60+
}
61+
62+
private void doExecute() {
63+
Mocker mocker = MockUtils.createMqttConsumer(adapter.getHeader(messageChannel,message,"mqtt_receivedTopic"));
64+
MessageHeaders header = adapter.getHeader(messageChannel, message);
65+
Map<String, Object> requestOrigin = new HashMap<>();
66+
for (Map.Entry<String, Object> entry : header.entrySet()) {
67+
requestOrigin.put(entry.getKey(), entry.getValue());
68+
}
69+
Map<String, Object> requestAttributes = Collections.singletonMap("Headers", requestOrigin);
70+
mocker.getTargetRequest().setAttributes(requestAttributes);
71+
mocker.getTargetRequest().setBody(Base64.getEncoder().encodeToString(adapter.getMsg(messageChannel,message)));
72+
if (ContextManager.needReplay()) {
73+
MockUtils.replayMocker(mocker);
74+
} else if (ContextManager.needRecord()) {
75+
MockUtils.recordMocker(mocker);
76+
}
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.arex.inst.mqtt.adapter;
2+
3+
import org.springframework.messaging.MessageHeaders;
4+
5+
/**
6+
* MessageAdapter
7+
*/
8+
public interface MessageAdapter<MC,Msg> {
9+
10+
MC warpMC(Object messageChannel);
11+
12+
Msg warpMessage(Object message);
13+
14+
byte[] getMsg(MC c, Msg msg);
15+
16+
MessageHeaders getHeader(MC c, Msg msg);
17+
18+
boolean markProcessed(Msg msg,String flagKey);
19+
20+
String getHeader(MC mc,Msg msg,String key);
21+
22+
boolean removeHeader(MC mc,Msg msg,String key);
23+
24+
boolean addHeader(MC mc,Msg msg,String key,String value);
25+
26+
Msg resetMsg(Msg msg);
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package io.arex.inst.mqtt.adapter;
2+
3+
import io.arex.agent.bootstrap.util.StringUtil;
4+
import io.arex.inst.mqtt.warp.GenericMessageWarp;
5+
import io.arex.inst.mqtt.warp.MessageHeaderWarp;
6+
import io.arex.inst.runtime.util.LogUtil;
7+
import org.springframework.messaging.Message;
8+
import org.springframework.messaging.MessageChannel;
9+
import org.springframework.messaging.MessageHeaders;
10+
import org.springframework.messaging.support.GenericMessage;
11+
12+
import java.lang.reflect.Field;
13+
import java.nio.charset.StandardCharsets;
14+
15+
/**
16+
* MessageImpl
17+
*/
18+
public class MessageAdapterImpl implements MessageAdapter<MessageChannel, Message> {
19+
20+
private static final MessageAdapterImpl INSTANCE = new MessageAdapterImpl();
21+
22+
public static MessageAdapterImpl getInstance() {
23+
return INSTANCE;
24+
}
25+
26+
@Override
27+
public byte[] getMsg(MessageChannel messageChannel, Message msg) {
28+
if (msg == null){
29+
return new byte[]{};
30+
}
31+
Object payload = msg.getPayload();
32+
if (payload == null){
33+
return new byte[]{};
34+
}
35+
if (payload instanceof byte[]){
36+
return ((byte[]) payload);
37+
}
38+
return payload.toString().getBytes(StandardCharsets.UTF_8);
39+
}
40+
41+
@Override
42+
public MessageChannel warpMC(Object messageChannel) {
43+
if (messageChannel == null){
44+
return null;
45+
}
46+
if (messageChannel instanceof MessageChannel){
47+
return (MessageChannel) messageChannel;
48+
}
49+
return null;
50+
}
51+
52+
@Override
53+
public Message warpMessage(Object message) {
54+
if (message == null){
55+
return null;
56+
}
57+
if (message instanceof GenericMessageWarp){
58+
return (GenericMessageWarp) message;
59+
}
60+
61+
if (message instanceof GenericMessage) {
62+
GenericMessage messageTemp = (GenericMessage) message;
63+
MessageHeaders headers = messageTemp.getHeaders();
64+
MessageHeaderWarp messageHeaderWarp = new MessageHeaderWarp(headers);
65+
return new GenericMessageWarp(messageTemp.getPayload(), messageHeaderWarp);
66+
}
67+
if (message instanceof Message){
68+
return (Message)message;
69+
}
70+
return null;
71+
}
72+
73+
@Override
74+
public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) {
75+
if (msg == null){
76+
return null;
77+
}
78+
if (msg instanceof GenericMessageWarp){
79+
GenericMessageWarp messageTemp = (GenericMessageWarp) msg;
80+
return messageTemp.getMessageHeaderWarp();
81+
}
82+
return msg.getHeaders();
83+
}
84+
85+
@Override
86+
public boolean markProcessed(Message message, String flagKey) {
87+
if (message == null){
88+
return true;
89+
}
90+
if (message instanceof GenericMessageWarp){
91+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp)message;
92+
genericMessageWarp.put(flagKey,Boolean.TRUE.toString());
93+
}
94+
return false;
95+
}
96+
97+
@Override
98+
public String getHeader(MessageChannel messageChannel, Message message, String key) {
99+
if (message == null || StringUtil.isEmpty(key)){
100+
return null;
101+
}
102+
if (message instanceof GenericMessageWarp) {
103+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
104+
Object object = genericMessageWarp.get(key);
105+
return object != null ? object.toString() : null;
106+
}
107+
108+
if(message instanceof GenericMessage){
109+
Object obj = message.getHeaders().get(key);
110+
return obj != null ? obj.toString() : null;
111+
}
112+
if (message.getHeaders() != null){
113+
Object obj = message.getHeaders().get(key);
114+
return obj != null ? obj.toString() : null ;
115+
}
116+
return null;
117+
}
118+
119+
@Override
120+
public boolean removeHeader(MessageChannel messageChannel, Message message, String key) {
121+
if (message == null || StringUtil.isEmpty(key)){
122+
return false;
123+
}
124+
if (message instanceof GenericMessageWarp){
125+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
126+
genericMessageWarp.removeHeader(key);
127+
return true;
128+
}
129+
return false;
130+
}
131+
132+
@Override
133+
public boolean addHeader(MessageChannel messageChannel, Message message, String key, String value) {
134+
if (message == null ){
135+
return false;
136+
}
137+
if (message instanceof GenericMessageWarp){
138+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
139+
genericMessageWarp.put(key,value);
140+
return true;
141+
}
142+
return false;
143+
}
144+
145+
@Override
146+
public Message resetMsg(Message message) {
147+
if (message == null){
148+
return null;
149+
}
150+
if (message instanceof GenericMessageWarp){
151+
try {
152+
GenericMessageWarp messageWarp = (GenericMessageWarp) message;
153+
Field headers = message.getClass().getSuperclass().getDeclaredField("headers");
154+
headers.setAccessible(true);
155+
headers.set(message, messageWarp.getMessageHeaderWarp());
156+
} catch (NoSuchFieldException e) {
157+
LogUtil.warn("MessageAdapterImpl.resetMsg - NoSuchFieldException", e);
158+
} catch (IllegalAccessException e) {
159+
LogUtil.warn("MessageAdapterImpl.resetMsg - IllegalAccessException", e);
160+
}
161+
}
162+
return message;
163+
}
164+
165+
166+
}

0 commit comments

Comments
 (0)