Skip to content

Commit 40cb273

Browse files
committed
Add monitorType JMX
1 parent 785844b commit 40cb273

File tree

5 files changed

+177
-1
lines changed

5 files changed

+177
-1
lines changed

docs/installation.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,25 @@ supported for legacy reasons and may be deprecated in the future. It is only
331331
supported for backend clusters with `web-ui.authentication.type = PASSWORD`. Set
332332
a username and password using `backendState` as with the `JDBC` option.
333333

334+
#### JMX
335+
336+
This pulls cluster information from the `v1/jmx/mbean` endpoint utilizing following
337+
metrics:
338+
- `trino.execution:name=QueryManager`
339+
- `trino.metadata:name=DiscoveryNodeManager`
340+
341+
Configure a username and password by adding `backendState` to your configuration.
342+
The username and password must be valid across all clusters. User requires
343+
`read` rights on `system_information` on all clusters.
344+
345+
```yaml
346+
backendState:
347+
username: "user"
348+
password: "password"
349+
clusterStatsConfiguration:
350+
monitorType: JMX
351+
```
352+
334353
#### NOOP
335354

336355
This option disables health checks.
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
import com.fasterxml.jackson.databind.ObjectMapper;
18+
import io.airlift.log.Logger;
19+
import io.trino.gateway.ha.config.BackendStateConfiguration;
20+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
21+
import okhttp3.Call;
22+
import okhttp3.Credentials;
23+
import okhttp3.OkHttpClient;
24+
import okhttp3.Request;
25+
import okhttp3.Response;
26+
27+
import java.io.IOException;
28+
29+
import static io.airlift.http.client.HttpStatus.fromStatusCode;
30+
import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.JMX_PATH;
31+
32+
public class ClusterStatsJmxMonitor
33+
implements ClusterStatsMonitor
34+
{
35+
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class);
36+
37+
private final String username;
38+
private final String password;
39+
40+
public ClusterStatsJmxMonitor(BackendStateConfiguration backendStateConfiguration)
41+
{
42+
this.username = backendStateConfiguration.getUsername();
43+
this.password = backendStateConfiguration.getPassword();
44+
}
45+
46+
@Override
47+
public ClusterStats monitor(ProxyBackendConfiguration backend)
48+
{
49+
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo());
50+
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);
51+
52+
// Fetch DiscoveryNodeManager stats
53+
String discoveryResponse = queryJmx(backend, "trino.metadata:name=DiscoveryNodeManager");
54+
if (discoveryResponse != null) {
55+
processDiscoveryNodeManagerStats(discoveryResponse, clusterStats);
56+
}
57+
58+
// Fetch QueryManager stats
59+
String queryResponse = queryJmx(backend, "trino.execution:name=QueryManager");
60+
if (queryResponse != null) {
61+
processQueryManagerStats(queryResponse, clusterStats);
62+
}
63+
64+
// Set additional fields
65+
clusterStats.proxyTo(backend.getProxyTo())
66+
.externalUrl(backend.getExternalUrl())
67+
.routingGroup(backend.getRoutingGroup());
68+
69+
ClusterStats stats = clusterStats.build();
70+
log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats);
71+
return stats;
72+
}
73+
74+
private void processDiscoveryNodeManagerStats(String response, ClusterStats.Builder clusterStats)
75+
{
76+
try {
77+
JsonNode rootNode = new ObjectMapper().readTree(response);
78+
JsonNode attributes = rootNode.get("attributes");
79+
if (attributes.isArray()) {
80+
for (JsonNode attribute : attributes) {
81+
if ("ActiveNodeCount".equals(attribute.get("name").asText())) {
82+
int activeNodes = attribute.get("value").asInt();
83+
clusterStats.numWorkerNodes(activeNodes)
84+
.healthy(activeNodes - 1 > 0);
85+
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s",
86+
activeNodes, activeNodes - 1 > 0 ? "Healthy" : "Unhealthy");
87+
break;
88+
}
89+
}
90+
}
91+
}
92+
catch (Exception e) {
93+
log.error(e, "Error parsing DiscoveryNodeManager stats");
94+
}
95+
}
96+
97+
private void processQueryManagerStats(String response, ClusterStats.Builder clusterStats)
98+
{
99+
try {
100+
JsonNode rootNode = new ObjectMapper().readTree(response);
101+
JsonNode attributes = rootNode.get("attributes");
102+
if (attributes.isArray()) {
103+
int queuedQueries = 0;
104+
int runningQueries = 0;
105+
for (JsonNode attribute : attributes) {
106+
String name = attribute.get("name").asText();
107+
if ("QueuedQueries".equals(name)) {
108+
queuedQueries = attribute.get("value").asInt();
109+
}
110+
else if ("RunningQueries".equals(name)) {
111+
runningQueries = attribute.get("value").asInt();
112+
}
113+
}
114+
clusterStats.queuedQueryCount(queuedQueries).runningQueryCount(runningQueries);
115+
log.debug("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d",
116+
queuedQueries, runningQueries);
117+
}
118+
}
119+
catch (Exception e) {
120+
log.error(e, "Error parsing QueryManager stats");
121+
}
122+
}
123+
124+
private String queryJmx(ProxyBackendConfiguration backend, String mbeanName)
125+
{
126+
String jmxUrl = backend.getProxyTo() + JMX_PATH + "/" + mbeanName;
127+
log.debug("Querying JMX at URL: %s", jmxUrl);
128+
OkHttpClient client = new OkHttpClient.Builder().build();
129+
130+
Request request = new Request.Builder()
131+
.url(jmxUrl)
132+
.addHeader("Authorization", Credentials.basic(username, password))
133+
.get()
134+
.build();
135+
136+
Call call = client.newCall(request);
137+
138+
try (Response res = call.execute()) {
139+
if (fromStatusCode(res.code()) == io.airlift.http.client.HttpStatus.OK) {
140+
log.debug("Successful JMX response for %s", mbeanName);
141+
return res.body().string();
142+
}
143+
else {
144+
log.error("Failed to fetch JMX data for %s, response code: %d", mbeanName, res.code());
145+
return null;
146+
}
147+
}
148+
catch (IOException e) {
149+
log.error(e, "Error querying JMX for %s", mbeanName);
150+
return null;
151+
}
152+
}
153+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
1818
NOOP,
1919
INFO_API,
2020
UI_API,
21-
JDBC
21+
JDBC,
22+
JMX
2223
}

gateway-ha/src/main/java/io/trino/gateway/ha/handler/QueryIdCachingProxyHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class QueryIdCachingProxyHandler
2626
public static final String OAUTH_PATH = "/oauth2";
2727
public static final String AUTHORIZATION = "Authorization";
2828
public static final String USER_HEADER = "X-Trino-User";
29+
public static final String JMX_PATH = "/v1/jmx/mbean";
2930

3031
private QueryIdCachingProxyHandler() {}
3132
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
2121
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
2222
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
23+
import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor;
2324
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
2425
import io.trino.gateway.ha.clustermonitor.ForMonitor;
2526
import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor;
@@ -50,6 +51,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
5051
case INFO_API -> new ClusterStatsInfoApiMonitor(httpClient, config.getMonitor());
5152
case UI_API -> new ClusterStatsHttpMonitor(config.getBackendState());
5253
case JDBC -> new ClusterStatsJdbcMonitor(config.getBackendState(), config.getMonitor());
54+
case JMX -> new ClusterStatsJmxMonitor(config.getBackendState());
5355
case NOOP -> new NoopClusterStatsMonitor();
5456
};
5557
}

0 commit comments

Comments
 (0)