Skip to content

Commit fb4ac57

Browse files
committed
add monitorType JMX
1 parent 785844b commit fb4ac57

File tree

5 files changed

+195
-19
lines changed

5 files changed

+195
-19
lines changed
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
import com.fasterxml.jackson.databind.ObjectMapper;
18+
import io.airlift.http.client.HttpStatus;
19+
import io.airlift.log.Logger;
20+
import io.trino.gateway.ha.config.BackendStateConfiguration;
21+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
22+
import okhttp3.Call;
23+
import okhttp3.Credentials;
24+
import okhttp3.OkHttpClient;
25+
import okhttp3.Request;
26+
import okhttp3.Response;
27+
28+
import java.io.IOException;
29+
30+
import static io.airlift.http.client.HttpStatus.fromStatusCode;
31+
32+
public class ClusterStatsJmxMonitor
33+
implements ClusterStatsMonitor
34+
{
35+
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class);
36+
37+
private final String username;
38+
private final String password;
39+
private static final String JMX_PATH = "/v1/jmx";
40+
41+
public ClusterStatsJmxMonitor(BackendStateConfiguration backendStateConfiguration)
42+
{
43+
this.username = backendStateConfiguration.getUsername();
44+
this.password = backendStateConfiguration.getPassword();
45+
}
46+
47+
@Override
48+
public ClusterStats monitor(ProxyBackendConfiguration backend)
49+
{
50+
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo());
51+
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);
52+
53+
// Fetch DiscoveryNodeManager stats
54+
String discoveryResponse = queryJmx(backend, "trino.metadata:name=DiscoveryNodeManager");
55+
if (discoveryResponse != null) {
56+
processDiscoveryNodeManagerStats(discoveryResponse, clusterStats);
57+
}
58+
59+
// Fetch QueryManager stats
60+
String queryResponse = queryJmx(backend, "trino.execution:name=QueryManager");
61+
if (queryResponse != null) {
62+
processQueryManagerStats(queryResponse, clusterStats);
63+
}
64+
65+
// Set additional fields
66+
clusterStats.proxyTo(backend.getProxyTo())
67+
.externalUrl(backend.getExternalUrl())
68+
.routingGroup(backend.getRoutingGroup());
69+
70+
ClusterStats stats = clusterStats.build();
71+
log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats);
72+
return stats;
73+
}
74+
75+
private void processDiscoveryNodeManagerStats(String response, ClusterStats.Builder clusterStats)
76+
{
77+
try {
78+
JsonNode rootNode = new ObjectMapper().readTree(response);
79+
JsonNode attributes = rootNode.get("attributes");
80+
if (attributes.isArray()) {
81+
for (JsonNode attribute : attributes) {
82+
if ("ActiveNodeCount".equals(attribute.get("name").asText())) {
83+
int activeNodes = attribute.get("value").asInt();
84+
clusterStats.numWorkerNodes(activeNodes)
85+
.healthy(activeNodes - 1 > 0);
86+
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s",
87+
activeNodes, activeNodes - 1 > 0 ? "Healthy" : "Unhealthy");
88+
break;
89+
}
90+
}
91+
}
92+
}
93+
catch (Exception e) {
94+
log.error(e, "Error parsing DiscoveryNodeManager stats");
95+
}
96+
}
97+
98+
private void processQueryManagerStats(String response, ClusterStats.Builder clusterStats)
99+
{
100+
try {
101+
JsonNode rootNode = new ObjectMapper().readTree(response);
102+
JsonNode attributes = rootNode.get("attributes");
103+
if (attributes.isArray()) {
104+
int queuedQueries = 0;
105+
int runningQueries = 0;
106+
for (JsonNode attribute : attributes) {
107+
String name = attribute.get("name").asText();
108+
if ("QueuedQueries".equals(name)) {
109+
queuedQueries = attribute.get("value").asInt();
110+
}
111+
else if ("RunningQueries".equals(name)) {
112+
runningQueries = attribute.get("value").asInt();
113+
}
114+
}
115+
clusterStats.queuedQueryCount(queuedQueries).runningQueryCount(runningQueries);
116+
log.debug("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d",
117+
queuedQueries, runningQueries);
118+
}
119+
}
120+
catch (Exception e) {
121+
log.error(e, "Error parsing QueryManager stats");
122+
}
123+
}
124+
125+
private String queryJmx(ProxyBackendConfiguration backend, String mbeanName)
126+
{
127+
String jmxUrl = backend.getProxyTo() + JMX_PATH + "/" + mbeanName;
128+
log.debug("Querying JMX at URL: %s", jmxUrl);
129+
OkHttpClient client = new OkHttpClient.Builder().build();
130+
131+
Request request = new Request.Builder()
132+
.url(jmxUrl)
133+
.addHeader("Authorization", Credentials.basic(username, password))
134+
.get()
135+
.build();
136+
137+
Call call = client.newCall(request);
138+
139+
try (Response res = call.execute()) {
140+
if (fromStatusCode(res.code()) == HttpStatus.OK) {
141+
log.debug("Successful JMX response for %s", mbeanName);
142+
return res.body().string();
143+
}
144+
else {
145+
log.error("Failed to fetch JMX data for %s, response code: %d", mbeanName, res.code());
146+
return null;
147+
}
148+
}
149+
catch (IOException e) {
150+
log.error(e, "Error querying JMX for %s", mbeanName);
151+
return null;
152+
}
153+
}
154+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
1818
NOOP,
1919
INFO_API,
2020
UI_API,
21-
JDBC
21+
JDBC,
22+
JMX
2223
}

gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,47 +31,49 @@
3131
import static org.testcontainers.utility.MountableFile.forClasspathResource;
3232

3333
@TestInstance(PER_CLASS)
34-
public class TestClusterStatsMonitor
35-
{
34+
final class TestClusterStatsMonitor {
3635
private TrinoContainer trino;
3736

3837
@BeforeAll
39-
public void setUp()
40-
{
38+
void setUp() {
4139
trino = new TrinoContainer("trinodb/trino");
4240
trino.withCopyFileToContainer(forClasspathResource("trino-config.properties"), "/etc/trino/config.properties");
41+
trino.withCopyFileToContainer(forClasspathResource("jvm.config"), "/etc/trino/jvm.config");
4342
trino.start();
4443
}
4544

4645
@AfterAll
47-
public void setup()
48-
{
46+
void setup() {
4947
trino.close();
5048
}
5149

5250
@Test
53-
public void testHttpMonitor()
54-
{
51+
void testHttpMonitor() {
5552
testClusterStatsMonitor(ClusterStatsHttpMonitor::new);
5653
}
5754

5855
@Test
59-
public void testJdbcMonitor()
60-
{
61-
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJdbcMonitor(backendStateConfiguration, new MonitorConfiguration()));
56+
void testJdbcMonitor() {
57+
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJdbcMonitor(backendStateConfiguration,
58+
new MonitorConfiguration()));
6259
}
6360

6461
@Test
65-
public void testInfoApiMonitor()
66-
{
62+
void testJmxMonitor() {
63+
testClusterStatsMonitor(ClusterStatsJmxMonitor::new);
64+
}
65+
66+
@Test
67+
void testInfoApiMonitor() {
6768
MonitorConfiguration monitorConfigurationWithRetries = new MonitorConfiguration();
6869
monitorConfigurationWithRetries.setRetries(10);
69-
testClusterStatsMonitor(ignored -> new ClusterStatsInfoApiMonitor(new JettyHttpClient(new HttpClientConfig()), new MonitorConfiguration()));
70-
testClusterStatsMonitor(ignored -> new ClusterStatsInfoApiMonitor(new JettyHttpClient(new HttpClientConfig()), monitorConfigurationWithRetries));
70+
testClusterStatsMonitor(ignored -> new ClusterStatsInfoApiMonitor(new JettyHttpClient(new HttpClientConfig()),
71+
new MonitorConfiguration()));
72+
testClusterStatsMonitor(ignored -> new ClusterStatsInfoApiMonitor(new JettyHttpClient(new HttpClientConfig()),
73+
monitorConfigurationWithRetries));
7174
}
7275

73-
private void testClusterStatsMonitor(Function<BackendStateConfiguration, ClusterStatsMonitor> monitorFactory)
74-
{
76+
private void testClusterStatsMonitor(Function<BackendStateConfiguration, ClusterStatsMonitor> monitorFactory) {
7577
BackendStateConfiguration backendStateConfiguration = new BackendStateConfiguration();
7678
backendStateConfiguration.setUsername("test_user");
7779
ClusterStatsMonitor monitor = monitorFactory.apply(backendStateConfiguration);
@@ -82,6 +84,6 @@ private void testClusterStatsMonitor(Function<BackendStateConfiguration, Cluster
8284

8385
ClusterStats stats = monitor.monitor(proxyBackend);
8486
assertThat(stats.clusterId()).isEqualTo("test_cluster");
85-
assertThat(stats.healthy()).isTrue();
87+
assertThat(stats.trinoStatus()).isEqualTo(TrinoStatus.HEALTHY);
8688
}
8789
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-server
2+
-XX:InitialRAMPercentage=80
3+
-XX:MaxRAMPercentage=80
4+
-XX:G1HeapRegionSize=32M
5+
-XX:+ExplicitGCInvokesConcurrent
6+
-XX:+ExitOnOutOfMemoryError
7+
-XX:+HeapDumpOnOutOfMemoryError
8+
-XX:-OmitStackTraceInFastThrow
9+
-XX:ReservedCodeCacheSize=512M
10+
-XX:PerMethodRecompilationCutoff=10000
11+
-XX:PerBytecodeRecompilationCutoff=10000
12+
-Djdk.attach.allowAttachSelf=true
13+
-Djdk.nio.maxCachedBufferSize=2000000
14+
-Dfile.encoding=UTF-8
15+
# Allow loading dynamic agent used by JOL
16+
-XX:+EnableDynamicAgentLoading
17+
-Dcom.sun.management.jmxremote.rmi.port=9081

gateway-ha/src/test/resources/trino-config.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@ catalog.management=${ENV:CATALOG_MANAGEMENT}
77

88
# Customize
99
http-server.process-forwarded=true
10+
jmx.rmiregistry.port=9080
11+
jmx.rmiserver.port=9081

0 commit comments

Comments
 (0)