Skip to content

Commit 7cae3db

Browse files
authored
Merge pull request #537 from yanhom1314/master
optimize adapter, perfect log and check
2 parents 7c6e446 + 9af618a commit 7cae3db

File tree

6 files changed

+59
-49
lines changed

6 files changed

+59
-49
lines changed

adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/alibaba/AlibabaDubboDtpAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ public class AlibabaDubboDtpAdapter extends AbstractDtpAdapter implements Initia
5252

5353
@Override
5454
public void afterPropertiesSet() throws Exception {
55-
//从ApplicationReadyEvent改为ContextRefreshedEvent后,
56-
//启动时无法dubbo获取线程池,这里直接每隔1s轮循,直至成功初始化线程池
55+
56+
// 从ApplicationReadyEvent改为ContextRefreshedEvent后,
57+
// 启动时无法dubbo获取线程池,这里直接每隔1s轮循,直至成功初始化线程池
5758
ExecutorService executor = Executors.newSingleThreadExecutor();
5859
executor.submit(() -> {
5960
while (!registered.get()) {

adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/ApacheDubboDtpAdapter.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -97,31 +97,7 @@ protected void initialize() {
9797
super.initialize();
9898
String currVersion = Version.getVersion();
9999
if (DubboVersion.compare(DubboVersion.VERSION_2_7_5, currVersion) > 0) {
100-
// 当前dubbo版本 < 2.7.5
101-
val handlers = JVMTI.getInstances(WrappedChannelHandler.class);
102-
if (CollectionUtils.isEmpty(handlers)) {
103-
return;
104-
}
105-
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
106-
handlers.forEach(handler -> {
107-
//获取WrappedChannelHandler中的原始线程池
108-
val originExecutor = ReflectionUtil.getFieldValue(EXECUTOR_FIELD, handler);
109-
if (!(originExecutor instanceof ExecutorService)) {
110-
return;
111-
}
112-
URL url = handler.getUrl();
113-
//低版本跳过消费者线程池配置
114-
if (!CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
115-
String port = String.valueOf(url.getPort());
116-
String tpName = genTpName(port);
117-
//增强原始线程池,替换为动态线程池代理
118-
enhanceOriginExecutor(tpName, (ThreadPoolExecutor) originExecutor, EXECUTOR_FIELD, handler);
119-
//获取增强后的新动态线程池
120-
Object newExexutor = ReflectionUtil.getFieldValue(EXECUTOR_FIELD, handler);
121-
//替换dataStore中的线程池
122-
dataStore.put(EXECUTOR_SERVICE_COMPONENT_KEY, port, newExexutor);
123-
}
124-
});
100+
handleLessThanV275();
125101
return;
126102
}
127103

@@ -140,9 +116,11 @@ protected void initialize() {
140116
return;
141117
}
142118

143-
//3.0.9 <= 当前dubbo版本 < 3.1.8时,执行线程池使用的是INTERNAL_SERVICE_EXECUTOR
144-
boolean isUseInternalExecutorVersion = DubboVersion.compare(currVersion, DubboVersion.VERSION_3_0_9) >= 0 && DubboVersion.compare(currVersion, DubboVersion.VERSION_3_1_8) < 0;
145-
Map<Object, ExecutorService> executorMap = isUseInternalExecutorVersion ? data.get(INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY) : data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
119+
// 3.0.9 <= 当前dubbo版本 < 3.1.8时,执行线程池使用的是INTERNAL_SERVICE_EXECUTOR
120+
boolean useInternalExecutorVersion = DubboVersion.compare(currVersion, DubboVersion.VERSION_3_0_9) >= 0 &&
121+
DubboVersion.compare(currVersion, DubboVersion.VERSION_3_1_8) < 0;
122+
Map<Object, ExecutorService> executorMap = useInternalExecutorVersion ?
123+
data.get(INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY) : data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
146124
if (MapUtils.isNotEmpty(executorMap)) {
147125
executorMap.forEach((k, v) -> {
148126
ThreadPoolExecutor proxy = getProxy(v);
@@ -151,6 +129,36 @@ protected void initialize() {
151129
});
152130
}
153131
}
132+
133+
/**
134+
* Handle versions less than 2.7.5
135+
*/
136+
private void handleLessThanV275() {
137+
val handlers = JVMTI.getInstances(WrappedChannelHandler.class);
138+
if (CollectionUtils.isEmpty(handlers)) {
139+
return;
140+
}
141+
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
142+
handlers.forEach(handler -> {
143+
val originExecutor = ReflectionUtil.getFieldValue(EXECUTOR_FIELD, handler);
144+
if (!(originExecutor instanceof ExecutorService)) {
145+
return;
146+
}
147+
URL url = handler.getUrl();
148+
// 低版本跳过消费者线程池配置
149+
if (!CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
150+
String port = String.valueOf(url.getPort());
151+
String tpName = genTpName(port);
152+
// 增强原始线程池,替换为动态线程池代理
153+
enhanceOriginExecutor(tpName, (ThreadPoolExecutor) originExecutor, EXECUTOR_FIELD, handler);
154+
// 获取增强后的新动态线程池
155+
Object newExexutor = ReflectionUtil.getFieldValue(EXECUTOR_FIELD, handler);
156+
// 替换dataStore中的线程池
157+
dataStore.put(EXECUTOR_SERVICE_COMPONENT_KEY, port, newExexutor);
158+
}
159+
});
160+
}
161+
154162
private ThreadPoolExecutor getProxy(Executor executor) {
155163
ThreadPoolExecutor proxy;
156164
if (executor instanceof EagerThreadPoolExecutor) {

common/src/main/java/org/dromara/dynamictp/common/parser/json/JacksonCreator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
1718
package org.dromara.dynamictp.common.parser.json;
1819

1920
import com.fasterxml.jackson.annotation.JsonInclude;

core/pom.xml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,6 @@
3737
<artifactId>transmittable-thread-local</artifactId>
3838
</dependency>
3939

40-
<dependency>
41-
<groupId>com.fasterxml.jackson.core</groupId>
42-
<artifactId>jackson-core</artifactId>
43-
</dependency>
44-
45-
<dependency>
46-
<groupId>com.fasterxml.jackson.core</groupId>
47-
<artifactId>jackson-databind</artifactId>
48-
</dependency>
49-
5040
<dependency>
5141
<groupId>cn.hutool</groupId>
5242
<artifactId>hutool-core</artifactId>
@@ -56,7 +46,6 @@
5646
<groupId>io.dropwizard.metrics</groupId>
5747
<artifactId>metrics-core</artifactId>
5848
</dependency>
59-
6049
</dependencies>
6150

6251
</project>

starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/AbstractWebServerDtpAdapter.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.context.ApplicationEvent;
3131
import org.springframework.context.ApplicationListener;
3232

33+
import java.util.Objects;
3334
import java.util.concurrent.Executor;
3435

3536
/**
@@ -51,19 +52,22 @@ public void onApplicationEvent(ApplicationEvent event) {
5152
afterInitialize();
5253
refresh(dtpProperties);
5354
} catch (Exception e) {
54-
log.error("Init web server thread pool failed.", e);
55+
log.error("DynamicTp adapter, {} init failed.", getTpName(), e);
5556
}
5657
}
5758
}
5859

5960
@Override
6061
protected void initialize() {
6162
super.initialize();
62-
if (executors.get(getTpName()) == null) {
63-
ApplicationContext applicationContext = SpringContextHolder.getInstance();
64-
WebServer webServer = ((WebServerApplicationContext) applicationContext).getWebServer();
65-
doEnhance(webServer);
66-
log.info("DynamicTp adapter, web server {} executor init end, executor: {}",
63+
if (Objects.nonNull(executors.get(getTpName()))) {
64+
return;
65+
}
66+
ApplicationContext applicationContext = SpringContextHolder.getInstance();
67+
WebServer webServer = ((WebServerApplicationContext) applicationContext).getWebServer();
68+
doEnhance(webServer);
69+
if (Objects.nonNull(executors.get(getTpName()))) {
70+
log.info("DynamicTp adapter, {} init end, executor: {}",
6771
getTpName(), ExecutorConverter.toMainFields(executors.get(getTpName())));
6872
}
6973
}

starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/undertow/UndertowDtpAdapter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,19 @@ public void doEnhance(WebServer webServer) {
6464
XnioWorker xnioWorker = undertow.getWorker();
6565
Object taskPool = ReflectionUtil.getFieldValue(XnioWorker.class, "taskPool", xnioWorker);
6666
if (Objects.isNull(taskPool)) {
67+
log.warn("DynamicTp adapter, {} enhance failed, taskPool is null.");
6768
return;
6869
}
70+
String tpName = getTpName();
6971
val handler = TaskPoolHandlerFactory.getTaskPoolHandler(taskPool.getClass().getSimpleName());
72+
if (Objects.isNull(handler)) {
73+
log.warn("DynamicTp adapter, {} enhance failed, unsupported TaskPool {}.",
74+
getTpName(), taskPool.getClass().getSimpleName());
75+
return;
76+
}
7077
String internalExecutor = handler.taskPoolType().getInternalExecutor();
7178
Object executor = ReflectionUtil.getFieldValue(taskPool.getClass(), internalExecutor, taskPool);
72-
String tpName = getTpName();
79+
7380
if (executor instanceof ThreadPoolExecutor) {
7481
enhanceOriginExecutor(tpName, (ThreadPoolExecutor) executor, internalExecutor, taskPool);
7582
} else if (executor instanceof EnhancedQueueExecutor) {
@@ -78,7 +85,7 @@ public void doEnhance(WebServer webServer) {
7885
ReflectionUtil.setFieldValue(internalExecutor, taskPool, proxy);
7986
putAndFinalize(tpName, (ExecutorService) executor, new EnhancedQueueExecutorAdapter(proxy));
8087
} catch (Throwable t) {
81-
log.error("DynamicTp adapter, enhance {} failed, please adjust the order of the two dependencies" +
88+
log.warn("DynamicTp adapter, {} enhance failed, please adjust the order of the two dependencies" +
8289
"(spring-boot-starter-undertow and starter-adapter-webserver) and try again.", tpName, t);
8390
executors.put(tpName, new ExecutorWrapper(tpName, handler.adapt(executor)));
8491
}

0 commit comments

Comments
 (0)