Skip to content

Support MQ - MQTT #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class MockCategoryType implements Serializable {
public static final MockCategoryType DUBBO_CONSUMER = createDependency("DubboConsumer");
public static final MockCategoryType DUBBO_PROVIDER = createEntryPoint("DubboProvider");
public static final MockCategoryType DUBBO_STREAM_PROVIDER = createDependency("DubboStreamProvider");
public static final MockCategoryType MQTT_MESSAGE_CONSUMER = createEntryPoint("MqttMessageConsumer");


private String name;
private boolean entryPoint;
Expand Down
5 changes: 5 additions & 0 deletions arex-agent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@
<artifactId>arex-jcasbin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>arex-integration-mqtt</artifactId>
<version>${project.version}</version>
</dependency>
<!--Agent instrumentation end-->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static ArexMocker createDubboStreamProvider(String operationName) {
return create(MockCategoryType.DUBBO_STREAM_PROVIDER, operationName);
}

public static ArexMocker createMqttConsumer(String operationName){
return create(MockCategoryType.MQTT_MESSAGE_CONSUMER,operationName);
}


public static ArexMocker create(MockCategoryType categoryType, String operationName) {
ArexMocker mocker = new ArexMocker();
long createTime = System.currentTimeMillis();
Expand Down
23 changes: 23 additions & 0 deletions arex-instrumentation/mq/arex-integration-mqtt/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arex-instrumentation-parent</artifactId>
<groupId>io.arex</groupId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arex-integration-mqtt</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.3.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.arex.inst.mqtt;

import io.arex.agent.bootstrap.internal.Pair;
import io.arex.agent.bootstrap.util.StringUtil;
import io.arex.inst.mqtt.adapter.MessageAdapter;
import io.arex.inst.runtime.config.Config;
import io.arex.inst.runtime.listener.CaseEvent;
import io.arex.inst.runtime.listener.CaseEventDispatcher;
import io.arex.inst.runtime.listener.EventSource;
import io.arex.inst.runtime.model.ArexConstants;
import io.arex.inst.runtime.util.IgnoreUtils;

/**
* MQTTAdapterHelper
*/
public class MQTTAdapterHelper {

public static final String PROCESSED_FLAG = "arex-processed-flag";


public static <MC, Msg> Pair<MC, Msg> onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
Msg msg = adapter.warpMessage(message);
if (msg == null) {
return null;
}
if (adapter.markProcessed(msg, PROCESSED_FLAG)) {
return null;
}
MC mc = adapter.warpMC(messageChannel);
if (mc == null){
return null;
}
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
if (shouldSkip(adapter, mc, msg)) {
return null;
}
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
String excludeMockTemplate = adapter.getHeader(mc, msg, ArexConstants.HEADER_EXCLUDE_MOCK);
CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent(EventSource.of(caseId, excludeMockTemplate)));
return Pair.of(mc,msg);
}


public static <MC, Msg> void onServiceExit(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message){
Msg msg = adapter.warpMessage(message);
MC mc = adapter.warpMC(messageChannel);
if (msg == null || mc == null) {
return;
}
adapter.removeHeader(mc,msg,PROCESSED_FLAG);
new MessageQueueExtractor<>( mc, msg,adapter).execute();
}




private static<MC, Msg> boolean shouldSkip(MessageAdapter<MC, Msg> adapter,MC mc, Msg msg){
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
// Replay scene
if (StringUtil.isNotEmpty(caseId)) {
return Config.get().getBoolean("arex.disable.replay", false);
}

String forceRecord = adapter.getHeader(mc, msg, ArexConstants.FORCE_RECORD);
// Do not skip if header with arex-force-record=true
if (Boolean.parseBoolean(forceRecord)) {
return false;
}
// Skip if request header with arex-replay-warm-up=true
if (Boolean.parseBoolean(adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP))) {
return true;
}
String topic = adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP);
if (StringUtil.isEmpty(topic)) {
return false;
}
if (IgnoreUtils.ignoreOperation(topic)) {
return true;
}
return Config.get().invalidRecord(topic);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.arex.inst.mqtt;

import io.arex.agent.bootstrap.model.Mocker;
import io.arex.inst.mqtt.adapter.MessageAdapter;
import io.arex.inst.runtime.context.ContextManager;
import io.arex.inst.runtime.model.ArexConstants;
import io.arex.inst.runtime.util.LogUtil;
import io.arex.inst.runtime.util.MockUtils;
import org.springframework.messaging.MessageHeaders;

import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* @author : MentosL
* @date : 2023/5/9 22:16
*/
public class MessageQueueExtractor<MC,Msg> {
private final MC messageChannel;
private final Msg message;
private final MessageAdapter<MC,Msg> adapter;


public MessageQueueExtractor(MC messageChannel, Msg message, MessageAdapter<MC, Msg> adapter) {
this.messageChannel = messageChannel;
this.message = message;
this.adapter = adapter;
}


public void execute() {
try {
if (message == null || messageChannel == null || adapter == null) {
return;
}
if (!ContextManager.needRecordOrReplay()) {
return;
}
executeBeforeProcess();
doExecute();
executeAfterProcess();
} catch (Exception e) {
LogUtil.warn("MessageQueue.execute", e);
}
}

private void executeBeforeProcess() {
if (ContextManager.needRecord()) {
adapter.addHeader(messageChannel,message, ArexConstants.RECORD_ID,ContextManager.currentContext().getCaseId());
}
if (ContextManager.needReplay()) {
adapter.addHeader(messageChannel,message, ArexConstants.REPLAY_ID,ContextManager.currentContext().getReplayId());
}
}
private void executeAfterProcess(){
// Think about other ways to replace the head
adapter.resetMsg(message);
}

private void doExecute() {
Mocker mocker = MockUtils.createMqttConsumer(adapter.getHeader(messageChannel,message,"mqtt_receivedTopic"));
MessageHeaders header = adapter.getHeader(messageChannel, message);
Map<String, Object> requestOrigin = new HashMap<>();
for (Map.Entry<String, Object> entry : header.entrySet()) {
requestOrigin.put(entry.getKey(), entry.getValue());
}
Map<String, Object> requestAttributes = Collections.singletonMap("Headers", requestOrigin);
mocker.getTargetRequest().setAttributes(requestAttributes);
mocker.getTargetRequest().setBody(Base64.getEncoder().encodeToString(adapter.getMsg(messageChannel,message)));
if (ContextManager.needReplay()) {
MockUtils.replayMocker(mocker);
} else if (ContextManager.needRecord()) {
MockUtils.recordMocker(mocker);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.arex.inst.mqtt.adapter;

import org.springframework.messaging.MessageHeaders;

/**
* MessageAdapter
*/
public interface MessageAdapter<MC,Msg> {

MC warpMC(Object messageChannel);

Msg warpMessage(Object message);

byte[] getMsg(MC c, Msg msg);

MessageHeaders getHeader(MC c, Msg msg);

boolean markProcessed(Msg msg,String flagKey);

String getHeader(MC mc,Msg msg,String key);

boolean removeHeader(MC mc,Msg msg,String key);

boolean addHeader(MC mc,Msg msg,String key,String value);

Msg resetMsg(Msg msg);

}
Loading