Skip to content

Commit 17bcc60

Browse files
committed
add monitorType JMX
1 parent 99cae08 commit 17bcc60

File tree

9 files changed

+385
-2
lines changed

9 files changed

+385
-2
lines changed

docs/installation.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,56 @@ monitor:
420420

421421
Other timeout parameters are not applicable to the JDBC connection.
422422

423+
#### JMX
424+
425+
The monitor type `JMX` can be used as an alternative to collect cluster information,
426+
which is required for the `QueryCountBasedRouterProvider`. This uses the `v1/jmx/mbean`
427+
endpoint on Trino clusters.
428+
429+
To enable this:
430+
431+
[JMX monitoring](https://trino.io/docs/current/admin/jmx.html) must be activated on all Trino clusters with:
432+
433+
```properties
434+
jmx.rmiregistry.port=<port>
435+
jmx.rmiserver.port=<port>
436+
```
437+
438+
Allow JMX endpoint access by adding rules to your [file-based access control](https://trino.io/docs/current/security/file-system-access-control.html)
439+
configuration. Example for `user`:
440+
441+
```json
442+
{
443+
"catalogs": [
444+
{
445+
"user": "user",
446+
"catalog": "system",
447+
"allow": "read-only"
448+
}
449+
],
450+
"system_information": [
451+
{
452+
"user": "user",
453+
"allow": ["read"]
454+
}
455+
]
456+
}
457+
```
458+
459+
Ensure that a username and password are configured by adding the `backendState`
460+
sectionto your configuration. The credentials must be consistent across all
461+
backend clusters and have `read` rights on the `system_information`.
462+
463+
```yaml
464+
backendState:
465+
username: "user"
466+
password: "password"
467+
```
468+
469+
The JMX monitor will use these credentials to authenticate against the
470+
JMX endpoint of each Trino cluster and collect metrics like running queries,
471+
queued queries, and worker nodes information.
472+
423473
#### UI_API
424474

425475
This pulls cluster information from the `ui/api/stats` REST endpoint. This is
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
import io.airlift.http.client.BasicAuthRequestFilter;
18+
import io.airlift.http.client.HttpClient;
19+
import io.airlift.http.client.HttpRequestFilter;
20+
import io.airlift.http.client.JsonResponseHandler;
21+
import io.airlift.http.client.Request;
22+
import io.airlift.http.client.UnexpectedResponseException;
23+
import io.airlift.log.Logger;
24+
import io.trino.gateway.ha.config.BackendStateConfiguration;
25+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
26+
27+
import java.net.URI;
28+
import java.util.Map;
29+
import java.util.Optional;
30+
import java.util.stream.Collectors;
31+
32+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
33+
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
34+
import static io.airlift.http.client.Request.Builder.prepareGet;
35+
import static io.airlift.json.JsonCodec.jsonCodec;
36+
import static java.util.Objects.requireNonNull;
37+
38+
public class ClusterStatsJmxMonitor
39+
implements ClusterStatsMonitor
40+
{
41+
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class);
42+
private static final JsonResponseHandler<JsonNode> JMX_JSON_RESPONSE_HANDLER = createJsonResponseHandler(
43+
jsonCodec(JsonNode.class));
44+
private static final String JMX_PATH = "/v1/jmx/mbean";
45+
46+
private final String username;
47+
private final String password;
48+
private final HttpClient client;
49+
50+
public ClusterStatsJmxMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration)
51+
{
52+
this.client = requireNonNull(client, "client is null");
53+
this.username = backendStateConfiguration.getUsername();
54+
this.password = backendStateConfiguration.getPassword();
55+
}
56+
57+
@Override
58+
public ClusterStats monitor(ProxyBackendConfiguration backend)
59+
{
60+
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo());
61+
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);
62+
63+
processJmxStats(backend, "trino.metadata:name=DiscoveryNodeManager", ClusterStatsJmxMonitor::processDiscoveryNodeManagerStats,
64+
clusterStats);
65+
processJmxStats(backend, "trino.execution:name=QueryManager", ClusterStatsJmxMonitor::processQueryManagerStats, clusterStats);
66+
67+
clusterStats.proxyTo(backend.getProxyTo())
68+
.externalUrl(backend.getExternalUrl())
69+
.routingGroup(backend.getRoutingGroup());
70+
71+
ClusterStats stats = clusterStats.build();
72+
log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats);
73+
return stats;
74+
}
75+
76+
private void processJmxStats(ProxyBackendConfiguration backend, String mbeanName,
77+
JmxStatProcessor processor, ClusterStats.Builder clusterStats)
78+
{
79+
queryJmx(backend, mbeanName)
80+
.ifPresentOrElse(
81+
response -> processor.process(response, clusterStats),
82+
() -> clusterStats.trinoStatus(TrinoStatus.UNHEALTHY));
83+
}
84+
85+
private static void processDiscoveryNodeManagerStats(JmxResponse response, ClusterStats.Builder clusterStats)
86+
{
87+
try {
88+
response.attributes().stream()
89+
.filter(attr -> "ActiveNodeCount".equals(attr.name()))
90+
.findFirst()
91+
.ifPresent(attr -> {
92+
int activeNodes = attr.value();
93+
TrinoStatus trinoStatus = activeNodes > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY;
94+
clusterStats.numWorkerNodes(activeNodes)
95+
.trinoStatus(trinoStatus);
96+
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s",
97+
activeNodes, trinoStatus);
98+
});
99+
}
100+
catch (Exception e) {
101+
log.error(e, "Error parsing DiscoveryNodeManager stats");
102+
clusterStats.trinoStatus(TrinoStatus.UNHEALTHY);
103+
}
104+
}
105+
106+
private static void processQueryManagerStats(JmxResponse response, ClusterStats.Builder clusterStats)
107+
{
108+
try {
109+
Map<String, Integer> stats = response.attributes().stream()
110+
.filter(attr -> "QueuedQueries".equals(attr.name()) ||
111+
"RunningQueries".equals(attr.name()))
112+
.collect(Collectors.toMap(
113+
JmxAttribute::name,
114+
JmxAttribute::value));
115+
116+
clusterStats.queuedQueryCount(stats.getOrDefault("QueuedQueries", 0))
117+
.runningQueryCount(stats.getOrDefault("RunningQueries", 0));
118+
119+
int queuedQueries = stats.get("QueuedQueries");
120+
int runningQueries = stats.get("RunningQueries");
121+
String message = String.format("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d",
122+
queuedQueries, runningQueries);
123+
log.debug(message);
124+
}
125+
catch (Exception e) {
126+
log.error(e, "Error parsing QueryManager stats");
127+
}
128+
}
129+
130+
private Optional<JmxResponse> queryJmx(ProxyBackendConfiguration backend, String mbeanName)
131+
{
132+
requireNonNull(backend, "backend is null");
133+
requireNonNull(mbeanName, "mbeanName is null");
134+
135+
String jmxUrl = backend.getProxyTo();
136+
Request request;
137+
138+
Request preparedRequest = prepareGet()
139+
.setUri(uriBuilderFrom(URI.create(jmxUrl))
140+
.appendPath(JMX_PATH)
141+
.appendPath(mbeanName)
142+
.build())
143+
.addHeader("X-Trino-User", username)
144+
.build();
145+
146+
boolean isHttps = preparedRequest.getUri().getScheme().equalsIgnoreCase("https");
147+
148+
if (isHttps) {
149+
HttpRequestFilter filter = new BasicAuthRequestFilter(username, password);
150+
request = filter.filterRequest(preparedRequest);
151+
}
152+
else {
153+
request = preparedRequest;
154+
}
155+
156+
log.debug("Querying JMX at %s for %s", request.getUri(), mbeanName);
157+
158+
try {
159+
JsonNode response = client.execute(request, JMX_JSON_RESPONSE_HANDLER);
160+
return Optional.ofNullable(response).map(JmxResponse::fromJson);
161+
}
162+
catch (UnexpectedResponseException e) {
163+
log.error(e, "Failed to fetch JMX data for %s, response code: %d", mbeanName, e.getStatusCode());
164+
return Optional.empty();
165+
}
166+
catch (Exception e) {
167+
log.error(e, "Exception while querying JMX at %s", jmxUrl);
168+
return Optional.empty();
169+
}
170+
}
171+
172+
@FunctionalInterface
173+
private interface JmxStatProcessor
174+
{
175+
void process(JmxResponse response, ClusterStats.Builder clusterStats);
176+
}
177+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
18+
public record JmxAttribute(String name, int value)
19+
{
20+
public static JmxAttribute fromJson(JsonNode json)
21+
{
22+
return new JmxAttribute(
23+
json.get("name").asText(),
24+
json.get("value").asInt());
25+
}
26+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
import com.google.common.collect.ImmutableList;
18+
19+
import java.util.List;
20+
import java.util.stream.StreamSupport;
21+
22+
public record JmxResponse(List<JmxAttribute> attributes)
23+
{
24+
public JmxResponse
25+
{
26+
attributes = ImmutableList.copyOf(attributes);
27+
}
28+
29+
public static JmxResponse fromJson(JsonNode json)
30+
{
31+
List<JmxAttribute> attributes = StreamSupport.stream(json.get("attributes").spliterator(), false)
32+
.map(JmxAttribute::fromJson)
33+
.collect(ImmutableList.toImmutableList());
34+
return new JmxResponse(attributes);
35+
}
36+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
1818
NOOP,
1919
INFO_API,
2020
UI_API,
21-
JDBC
21+
JDBC,
22+
JMX
2223
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
2323
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
2424
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
25+
import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor;
2526
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
2627
import io.trino.gateway.ha.clustermonitor.ClusterStatsObserver;
2728
import io.trino.gateway.ha.clustermonitor.ForMonitor;
@@ -228,6 +229,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
228229
case INFO_API -> new ClusterStatsInfoApiMonitor(httpClient, configuration.getMonitor());
229230
case UI_API -> new ClusterStatsHttpMonitor(configuration.getBackendState());
230231
case JDBC -> new ClusterStatsJdbcMonitor(configuration.getBackendState(), configuration.getMonitor());
232+
case JMX -> new ClusterStatsJmxMonitor(httpClient, configuration.getBackendState());
231233
case NOOP -> new NoopClusterStatsMonitor();
232234
};
233235
}

0 commit comments

Comments
 (0)