Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

web and third-party thread pool monitoring data is reported to es #1576

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ private static void handleElasticSearch(MonitorHandlerTypeEnum type, MonitorHand
context.monitors.add(new DynamicThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler());
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@

public class BeanUtilTest {

@Test
public void beanToBeanConvertTest(){
// Test BeanToAnotherBean
final Person person = new Person();
person.setName("Hippo4j");
person.setAge(1);
person.setAddress("hippo4j.cn");
person.setSize(999);
GoodPerson goodPerson = BeanUtil.convert(person, GoodPerson.class);
Assert.assertSame(goodPerson.getClass(), GoodPerson.class);
Assert.assertEquals("Hippo4j", person.getName());
Assert.assertEquals(1, person.getAge());
Assert.assertEquals("hippo4j.cn", person.getAddress());
Assert.assertEquals(999, person.getSize().intValue());
}

@Test
public void beanToMapConvertTest() {
// 测试BeanToMap
Expand Down Expand Up @@ -171,4 +187,12 @@ static class PreCustomer {
String name;
Integer statusCode;
}

@Getter
@Setter
static class GoodPerson extends Person{

String gender;
String nature;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public DynamicThreadPoolElasticSearchMonitorHandler dynamicThreadPoolElasticSear
@Bean
@ConditionalOnExpression("'${spring.dynamic.thread-pool.monitor.thread-pool-types:}'.contains('web')")
public WebThreadPoolElasticSearchMonitorHandler webThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
return new WebThreadPoolElasticSearchMonitorHandler(handler);
return new WebThreadPoolElasticSearchMonitorHandler();
}

@Bean
@ConditionalOnExpression("'${spring.dynamic.thread-pool.monitor.thread-pool-types:}'.contains('adapter')")
public AdapterThreadPoolElasticSearchMonitorHandler adapterThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
return new AdapterThreadPoolElasticSearchMonitorHandler(handler);
return new AdapterThreadPoolElasticSearchMonitorHandler();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,144 @@

package cn.hippo4j.monitor.elasticsearch;

import cn.hippo4j.common.model.ThreadPoolAdapterState;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.FileUtil;
import cn.hippo4j.monitor.base.AbstractAdapterThreadPoolMonitor;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo;
import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Adapter thread-pool elastic-search monitor handler.
*/
@Slf4j
public class AdapterThreadPoolElasticSearchMonitorHandler extends AbstractDynamicThreadPoolMonitor {
public class AdapterThreadPoolElasticSearchMonitorHandler extends AbstractAdapterThreadPoolMonitor {

public AdapterThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
super(handler);
private AtomicBoolean isIndexExist = null;
@Override
protected void execute(ThreadPoolAdapterState threadPoolAdapterState) {
ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo = BeanUtil.convert(threadPoolAdapterState, ElasticSearchThreadPoolRunStateInfo.class);
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String indexName = environment.getProperty("es.thread-pool-state.index.name", "adapter-thread-pool-state");
String applicationName = environment.getProperty("spring.application.name", "application");
if (!this.isExists(indexName)) {
List<String> rawMapping = FileUtil.readLines("mapping.json", StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
this.createIndex(AdapterThreadPoolElasticSearchMonitorHandler.EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build());
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
this.log2Es(esThreadPoolRunStateInfo, indexName);
}

@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
// TODO
public void log2Es(ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
try {
IndexRequest request = new IndexRequest(indexName, "_doc");
request.id(esThreadPoolRunStateInfo.getId());
String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo);
request.source(stateJson, XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
log.info("write thread-pool state to es, id is :{}", response.getId());
} catch (Exception ex) {
log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ",
indexName,
"_doc",
esThreadPoolRunStateInfo.getId(),
ex);
}
}

public synchronized boolean isExists(String index) {
if (Objects.isNull(isIndexExist)) {
boolean exists = false;
GetIndexRequest request = new GetIndexRequest(index);
try {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
exists = client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("check es index fail");
}
isIndexExist = new AtomicBoolean(exists);
}
return isIndexExist.get();
}

public void createIndex(AdapterThreadPoolElasticSearchMonitorHandler.EsIndex esIndex) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
boolean acknowledged = false;
CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex());
if (StringUtils.hasText(esIndex.getMapping())) {
request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON);
}
if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) {
request.settings(Settings.builder()
.put("index.number_of_shards", esIndex.getShards())
.put("index.number_of_replicas", esIndex.getReplicas()));
}
if (StringUtils.hasText(esIndex.getAlias())) {
request.alias(new Alias(esIndex.getAlias()));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
acknowledged = createIndexResponse.isAcknowledged();
} catch (IOException e) {
log.error("create es index exception", e);
}
if (acknowledged) {
log.info("create es index success");
isIndexExist.set(true);
} else {
log.error("create es index fail");
throw new RuntimeException("cannot auto create thread-pool state es index");
}
}

@Override
public String getType() {
return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase();
}

/**
* Es Index
*/
@Getter
@Builder
private static class EsIndex {

String index;
String type;
String mapping;
Integer shards;
Integer replicas;
String alias;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,140 @@
package cn.hippo4j.monitor.elasticsearch;

import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.FileUtil;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.monitor.base.AbstractWebThreadPoolMonitor;
import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo;
import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Web thread-pool elastic-search monitor handler.
*/
@Slf4j
public class WebThreadPoolElasticSearchMonitorHandler extends AbstractDynamicThreadPoolMonitor {

public WebThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) {
super(handler);
}
public class WebThreadPoolElasticSearchMonitorHandler extends AbstractWebThreadPoolMonitor {

private AtomicBoolean isIndexExist = null;
@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
// TODO
ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ElasticSearchThreadPoolRunStateInfo.class);
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String indexName = environment.getProperty("es.thread-pool-state.index.name", "web-thread-pool-state");
String applicationName = environment.getProperty("spring.application.name", "application");
if (!this.isExists(indexName)) {
List<String> rawMapping = FileUtil.readLines("mapping.json", StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
this.createIndex(WebThreadPoolElasticSearchMonitorHandler.EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build());
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
this.log2Es(esThreadPoolRunStateInfo, indexName);
}

public void log2Es(ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
try {
IndexRequest request = new IndexRequest(indexName, "_doc");
request.id(esThreadPoolRunStateInfo.getId());
String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo);
request.source(stateJson, XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
log.info("write thread-pool state to es, id is :{}", response.getId());
} catch (Exception ex) {
log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ",
indexName,
"_doc",
esThreadPoolRunStateInfo.getId(),
ex);
}
}

public synchronized boolean isExists(String index) {
if (Objects.isNull(isIndexExist)) {
boolean exists = false;
GetIndexRequest request = new GetIndexRequest(index);
try {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
exists = client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("check es index fail");
}
isIndexExist = new AtomicBoolean(exists);
}
return isIndexExist.get();
}

public void createIndex(WebThreadPoolElasticSearchMonitorHandler.EsIndex esIndex) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
boolean acknowledged = false;
CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex());
if (StringUtils.hasText(esIndex.getMapping())) {
request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON);
}
if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) {
request.settings(Settings.builder()
.put("index.number_of_shards", esIndex.getShards())
.put("index.number_of_replicas", esIndex.getReplicas()));
}
if (StringUtils.hasText(esIndex.getAlias())) {
request.alias(new Alias(esIndex.getAlias()));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
acknowledged = createIndexResponse.isAcknowledged();
} catch (IOException e) {
log.error("create es index exception", e);
}
if (acknowledged) {
log.info("create es index success");
isIndexExist.set(true);
} else {
log.error("create es index fail");
throw new RuntimeException("cannot auto create thread-pool state es index");
}
}
@Override
public String getType() {
return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase();
}
}

/**
* Es Index
*/
@Getter
@Builder
private static class EsIndex {

String index;
String type;
String mapping;
Integer shards;
Integer replicas;
String alias;
}
}
Loading