From 8ce342b2068f2ef0a516d1f59f12234129b6db37 Mon Sep 17 00:00:00 2001 From: theNorthWindBlow Date: Thu, 10 Oct 2024 21:28:56 +0800 Subject: [PATCH 1/2] test:Test BeanUtil tool --- .../hippo4j/common/toolkit/BeanUtilTest.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/infra/common/src/test/java/cn/hippo4j/common/toolkit/BeanUtilTest.java b/infra/common/src/test/java/cn/hippo4j/common/toolkit/BeanUtilTest.java index d664ef1fcc..4a8cb8f6cf 100644 --- a/infra/common/src/test/java/cn/hippo4j/common/toolkit/BeanUtilTest.java +++ b/infra/common/src/test/java/cn/hippo4j/common/toolkit/BeanUtilTest.java @@ -35,6 +35,22 @@ public class BeanUtilTest { + @Test + public void beanToBeanConvertTest(){ + // Test BeanToAnotherBean + final Person person = new Person(); + person.setName("Hippo4j"); + person.setAge(1); + person.setAddress("hippo4j.cn"); + person.setSize(999); + GoodPerson goodPerson = BeanUtil.convert(person, GoodPerson.class); + Assert.assertSame(goodPerson.getClass(), GoodPerson.class); + Assert.assertEquals("Hippo4j", person.getName()); + Assert.assertEquals(1, person.getAge()); + Assert.assertEquals("hippo4j.cn", person.getAddress()); + Assert.assertEquals(999, person.getSize().intValue()); + } + @Test public void beanToMapConvertTest() { // 测试BeanToMap @@ -171,4 +187,12 @@ static class PreCustomer { String name; Integer statusCode; } + + @Getter + @Setter + static class GoodPerson extends Person{ + + String gender; + String nature; + } } From c56f9cfdac1f25edeab7568d86459e02514eab47 Mon Sep 17 00:00:00 2001 From: theNorthWindBlow Date: Thu, 10 Oct 2024 21:48:54 +0800 Subject: [PATCH 2/2] feature: web and adapter thread pools monitor data reporting es --- .../monitor/MonitorHandlersConfigurator.java | 4 +- ...ElasticSearchMonitorAutoConfiguration.java | 4 +- ...ThreadPoolElasticSearchMonitorHandler.java | 127 +++++++++++++++++- ...ThreadPoolElasticSearchMonitorHandler.java | 126 ++++++++++++++++- 4 files changed, 244 insertions(+), 17 deletions(-) diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/monitor/MonitorHandlersConfigurator.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/monitor/MonitorHandlersConfigurator.java index 30252ec6b7..76509cdda8 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/monitor/MonitorHandlersConfigurator.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/monitor/MonitorHandlersConfigurator.java @@ -159,10 +159,10 @@ private static void handleElasticSearch(MonitorHandlerTypeEnum type, MonitorHand context.monitors.add(new DynamicThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler)); break; case WEB: - context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler)); + context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler()); break; case ADAPTER: - context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler)); + context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler()); break; } } diff --git a/starters/threadpool/monitor/hippo4j-spring-boot-starter-monitor-elasticsearch/src/main/java/cn/hippo4j/springboot/starter/monitor/elasticsearch/ElasticSearchMonitorAutoConfiguration.java b/starters/threadpool/monitor/hippo4j-spring-boot-starter-monitor-elasticsearch/src/main/java/cn/hippo4j/springboot/starter/monitor/elasticsearch/ElasticSearchMonitorAutoConfiguration.java index 5c8bd5c2da..26c79b5d8c 100644 --- a/starters/threadpool/monitor/hippo4j-spring-boot-starter-monitor-elasticsearch/src/main/java/cn/hippo4j/springboot/starter/monitor/elasticsearch/ElasticSearchMonitorAutoConfiguration.java +++ b/starters/threadpool/monitor/hippo4j-spring-boot-starter-monitor-elasticsearch/src/main/java/cn/hippo4j/springboot/starter/monitor/elasticsearch/ElasticSearchMonitorAutoConfiguration.java @@ -44,12 +44,12 @@ public DynamicThreadPoolElasticSearchMonitorHandler dynamicThreadPoolElasticSear @Bean @ConditionalOnExpression("'${spring.dynamic.thread-pool.monitor.thread-pool-types:}'.contains('web')") public WebThreadPoolElasticSearchMonitorHandler webThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) { - return new WebThreadPoolElasticSearchMonitorHandler(handler); + return new WebThreadPoolElasticSearchMonitorHandler(); } @Bean @ConditionalOnExpression("'${spring.dynamic.thread-pool.monitor.thread-pool-types:}'.contains('adapter')") public AdapterThreadPoolElasticSearchMonitorHandler adapterThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) { - return new AdapterThreadPoolElasticSearchMonitorHandler(handler); + return new AdapterThreadPoolElasticSearchMonitorHandler(); } } diff --git a/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/AdapterThreadPoolElasticSearchMonitorHandler.java b/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/AdapterThreadPoolElasticSearchMonitorHandler.java index b0585a06b6..c985a1f8c7 100644 --- a/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/AdapterThreadPoolElasticSearchMonitorHandler.java +++ b/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/AdapterThreadPoolElasticSearchMonitorHandler.java @@ -17,29 +17,144 @@ package cn.hippo4j.monitor.elasticsearch; +import cn.hippo4j.common.model.ThreadPoolAdapterState; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; +import cn.hippo4j.common.toolkit.BeanUtil; +import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.core.config.ApplicationContextHolder; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; +import cn.hippo4j.core.toolkit.FileUtil; +import cn.hippo4j.monitor.base.AbstractAdapterThreadPoolMonitor; import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor; +import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo; import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum; +import lombok.Builder; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.springframework.core.env.Environment; +import org.springframework.util.StringUtils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** * Adapter thread-pool elastic-search monitor handler. */ @Slf4j -public class AdapterThreadPoolElasticSearchMonitorHandler extends AbstractDynamicThreadPoolMonitor { +public class AdapterThreadPoolElasticSearchMonitorHandler extends AbstractAdapterThreadPoolMonitor { - public AdapterThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) { - super(handler); + private AtomicBoolean isIndexExist = null; + @Override + protected void execute(ThreadPoolAdapterState threadPoolAdapterState) { + ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo = BeanUtil.convert(threadPoolAdapterState, ElasticSearchThreadPoolRunStateInfo.class); + Environment environment = ApplicationContextHolder.getInstance().getEnvironment(); + String indexName = environment.getProperty("es.thread-pool-state.index.name", "adapter-thread-pool-state"); + String applicationName = environment.getProperty("spring.application.name", "application"); + if (!this.isExists(indexName)) { + List rawMapping = FileUtil.readLines("mapping.json", StandardCharsets.UTF_8); + String mapping = String.join(" ", rawMapping); + // if index doesn't exsit, this function may try to create one, but recommend to create index manually. + this.createIndex(AdapterThreadPoolElasticSearchMonitorHandler.EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build()); + } + esThreadPoolRunStateInfo.setApplicationName(applicationName); + esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis()); + this.log2Es(esThreadPoolRunStateInfo, indexName); } - @Override - protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) { - // TODO + public void log2Es(ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) { + RestHighLevelClient client = ElasticSearchClientHolder.getClient(); + try { + IndexRequest request = new IndexRequest(indexName, "_doc"); + request.id(esThreadPoolRunStateInfo.getId()); + String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo); + request.source(stateJson, XContentType.JSON); + IndexResponse response = client.index(request, RequestOptions.DEFAULT); + log.info("write thread-pool state to es, id is :{}", response.getId()); + } catch (Exception ex) { + log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ", + indexName, + "_doc", + esThreadPoolRunStateInfo.getId(), + ex); + } + } + + public synchronized boolean isExists(String index) { + if (Objects.isNull(isIndexExist)) { + boolean exists = false; + GetIndexRequest request = new GetIndexRequest(index); + try { + RestHighLevelClient client = ElasticSearchClientHolder.getClient(); + exists = client.indices().exists(request, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error("check es index fail"); + } + isIndexExist = new AtomicBoolean(exists); + } + return isIndexExist.get(); + } + + public void createIndex(AdapterThreadPoolElasticSearchMonitorHandler.EsIndex esIndex) { + RestHighLevelClient client = ElasticSearchClientHolder.getClient(); + boolean acknowledged = false; + CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex()); + if (StringUtils.hasText(esIndex.getMapping())) { + request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON); + } + if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) { + request.settings(Settings.builder() + .put("index.number_of_shards", esIndex.getShards()) + .put("index.number_of_replicas", esIndex.getReplicas())); + } + if (StringUtils.hasText(esIndex.getAlias())) { + request.alias(new Alias(esIndex.getAlias())); + } + try { + CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); + acknowledged = createIndexResponse.isAcknowledged(); + } catch (IOException e) { + log.error("create es index exception", e); + } + if (acknowledged) { + log.info("create es index success"); + isIndexExist.set(true); + } else { + log.error("create es index fail"); + throw new RuntimeException("cannot auto create thread-pool state es index"); + } } @Override public String getType() { return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase(); } + + /** + * Es Index + */ + @Getter + @Builder + private static class EsIndex { + + String index; + String type; + String mapping; + Integer shards; + Integer replicas; + String alias; + } } + diff --git a/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/WebThreadPoolElasticSearchMonitorHandler.java b/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/WebThreadPoolElasticSearchMonitorHandler.java index 415a9a52c3..13a55a804f 100644 --- a/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/WebThreadPoolElasticSearchMonitorHandler.java +++ b/threadpool/monitor/elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/WebThreadPoolElasticSearchMonitorHandler.java @@ -18,28 +18,140 @@ package cn.hippo4j.monitor.elasticsearch; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; +import cn.hippo4j.common.toolkit.BeanUtil; +import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.core.config.ApplicationContextHolder; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; +import cn.hippo4j.core.toolkit.FileUtil; import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor; +import cn.hippo4j.monitor.base.AbstractWebThreadPoolMonitor; +import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo; import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum; +import lombok.Builder; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.springframework.core.env.Environment; +import org.springframework.util.StringUtils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** * Web thread-pool elastic-search monitor handler. */ @Slf4j -public class WebThreadPoolElasticSearchMonitorHandler extends AbstractDynamicThreadPoolMonitor { - - public WebThreadPoolElasticSearchMonitorHandler(ThreadPoolRunStateHandler handler) { - super(handler); - } +public class WebThreadPoolElasticSearchMonitorHandler extends AbstractWebThreadPoolMonitor { + private AtomicBoolean isIndexExist = null; @Override protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) { - // TODO + ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ElasticSearchThreadPoolRunStateInfo.class); + Environment environment = ApplicationContextHolder.getInstance().getEnvironment(); + String indexName = environment.getProperty("es.thread-pool-state.index.name", "web-thread-pool-state"); + String applicationName = environment.getProperty("spring.application.name", "application"); + if (!this.isExists(indexName)) { + List rawMapping = FileUtil.readLines("mapping.json", StandardCharsets.UTF_8); + String mapping = String.join(" ", rawMapping); + // if index doesn't exsit, this function may try to create one, but recommend to create index manually. + this.createIndex(WebThreadPoolElasticSearchMonitorHandler.EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build()); + } + esThreadPoolRunStateInfo.setApplicationName(applicationName); + esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis()); + this.log2Es(esThreadPoolRunStateInfo, indexName); } + public void log2Es(ElasticSearchThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) { + RestHighLevelClient client = ElasticSearchClientHolder.getClient(); + try { + IndexRequest request = new IndexRequest(indexName, "_doc"); + request.id(esThreadPoolRunStateInfo.getId()); + String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo); + request.source(stateJson, XContentType.JSON); + IndexResponse response = client.index(request, RequestOptions.DEFAULT); + log.info("write thread-pool state to es, id is :{}", response.getId()); + } catch (Exception ex) { + log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ", + indexName, + "_doc", + esThreadPoolRunStateInfo.getId(), + ex); + } + } + + public synchronized boolean isExists(String index) { + if (Objects.isNull(isIndexExist)) { + boolean exists = false; + GetIndexRequest request = new GetIndexRequest(index); + try { + RestHighLevelClient client = ElasticSearchClientHolder.getClient(); + exists = client.indices().exists(request, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error("check es index fail"); + } + isIndexExist = new AtomicBoolean(exists); + } + return isIndexExist.get(); + } + + public void createIndex(WebThreadPoolElasticSearchMonitorHandler.EsIndex esIndex) { + RestHighLevelClient client = ElasticSearchClientHolder.getClient(); + boolean acknowledged = false; + CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex()); + if (StringUtils.hasText(esIndex.getMapping())) { + request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON); + } + if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) { + request.settings(Settings.builder() + .put("index.number_of_shards", esIndex.getShards()) + .put("index.number_of_replicas", esIndex.getReplicas())); + } + if (StringUtils.hasText(esIndex.getAlias())) { + request.alias(new Alias(esIndex.getAlias())); + } + try { + CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); + acknowledged = createIndexResponse.isAcknowledged(); + } catch (IOException e) { + log.error("create es index exception", e); + } + if (acknowledged) { + log.info("create es index success"); + isIndexExist.set(true); + } else { + log.error("create es index fail"); + throw new RuntimeException("cannot auto create thread-pool state es index"); + } + } @Override public String getType() { return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase(); } -} + + /** + * Es Index + */ + @Getter + @Builder + private static class EsIndex { + + String index; + String type; + String mapping; + Integer shards; + Integer replicas; + String alias; + } +} \ No newline at end of file