Skip to content

Commit 4f38500

Browse files
committed
add monitorType JMX
1 parent 99cae08 commit 4f38500

File tree

9 files changed

+386
-2
lines changed

9 files changed

+386
-2
lines changed

docs/installation.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,56 @@ monitor:
420420

421421
Other timeout parameters are not applicable to the JDBC connection.
422422

423+
#### JMX
424+
425+
The monitor type `JMX` can be used as an alternative to collect cluster information,
426+
which is required for the `QueryCountBasedRouterProvider`. This uses the `v1/jmx/mbean`
427+
endpoint on Trino clusters.
428+
429+
To enable this:
430+
431+
[JMX monitoring](https://trino.io/docs/current/admin/jmx.html) must be activated on all Trino clusters with:
432+
433+
```properties
434+
jmx.rmiregistry.port=<port>
435+
jmx.rmiserver.port=<port>
436+
```
437+
438+
Allow JMX endpoint access by adding rules to your [file-based access control](https://trino.io/docs/current/security/file-system-access-control.html)
439+
configuration. Example for `user`:
440+
441+
```json
442+
{
443+
"catalogs": [
444+
{
445+
"user": "user",
446+
"catalog": "system",
447+
"allow": "read-only"
448+
}
449+
],
450+
"system_information": [
451+
{
452+
"user": "user",
453+
"allow": ["read"]
454+
}
455+
]
456+
}
457+
```
458+
459+
Ensure that a username and password are configured by adding the `backendState`
460+
sectionto your configuration. The credentials must be consistent across all
461+
backend clusters and have `read` rights on the `system_information`.
462+
463+
```yaml
464+
backendState:
465+
username: "user"
466+
password: "password"
467+
```
468+
469+
The JMX monitor will use these credentials to authenticate against the
470+
JMX endpoint of each Trino cluster and collect metrics like running queries,
471+
queued queries, and worker nodes information.
472+
423473
#### UI_API
424474

425475
This pulls cluster information from the `ui/api/stats` REST endpoint. This is
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
import io.airlift.http.client.BasicAuthRequestFilter;
18+
import io.airlift.http.client.HttpClient;
19+
import io.airlift.http.client.HttpRequestFilter;
20+
import io.airlift.http.client.JsonResponseHandler;
21+
import io.airlift.http.client.Request;
22+
import io.airlift.http.client.UnexpectedResponseException;
23+
import io.airlift.log.Logger;
24+
import io.trino.gateway.ha.config.BackendStateConfiguration;
25+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
26+
27+
import java.net.URI;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import java.util.function.Predicate;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.Stream;
34+
35+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
36+
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
37+
import static io.airlift.http.client.Request.Builder.prepareGet;
38+
import static io.airlift.json.JsonCodec.jsonCodec;
39+
import static java.util.Objects.requireNonNull;
40+
41+
public class ClusterStatsJmxMonitor
42+
implements ClusterStatsMonitor
43+
{
44+
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class);
45+
private static final JsonResponseHandler<JsonNode> JMX_JSON_RESPONSE_HANDLER = createJsonResponseHandler(jsonCodec(JsonNode.class));
46+
private static final String JMX_PATH = "/v1/jmx/mbean";
47+
48+
private final String username;
49+
private final String password;
50+
private final HttpClient client;
51+
52+
public ClusterStatsJmxMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration)
53+
{
54+
this.client = requireNonNull(client, "client is null");
55+
this.username = backendStateConfiguration.getUsername();
56+
this.password = backendStateConfiguration.getPassword();
57+
}
58+
59+
private static void processDiscoveryNodeManagerStats(JmxResponse response, ClusterStats.Builder clusterStats)
60+
{
61+
try {
62+
response.attributes().stream()
63+
.filter(attribute -> "ActiveNodeCount".equals(attribute.name()))
64+
.findFirst()
65+
.ifPresent(attribute -> {
66+
int activeNodes = attribute.value();
67+
TrinoStatus trinoStatus = activeNodes > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY;
68+
clusterStats.numWorkerNodes(activeNodes);
69+
clusterStats.trinoStatus(trinoStatus);
70+
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s", activeNodes, trinoStatus);
71+
});
72+
}
73+
catch (Exception e) {
74+
log.error(e, "Error parsing DiscoveryNodeManager stats");
75+
clusterStats.trinoStatus(TrinoStatus.UNHEALTHY);
76+
}
77+
}
78+
79+
private static void processQueryManagerStats(JmxResponse response, ClusterStats.Builder clusterStats)
80+
{
81+
try {
82+
List<JmxAttribute> attributeList = response.attributes();
83+
Stream<JmxAttribute> attributeStream = attributeList.stream();
84+
Predicate<JmxAttribute> isRelevantQuery = attribute -> {
85+
String attributeName = attribute.name();
86+
return "QueuedQueries".equals(attributeName) || "RunningQueries".equals(attributeName);
87+
};
88+
Stream<JmxAttribute> filteredAttributes = attributeStream.filter(isRelevantQuery);
89+
Map<String, Integer> stats = filteredAttributes.collect(Collectors.toMap(JmxAttribute::name, JmxAttribute::value));
90+
91+
int queuedQueryCount = stats.getOrDefault("QueuedQueries", 0);
92+
clusterStats.queuedQueryCount(queuedQueryCount);
93+
int runningQueryCount = stats.getOrDefault("RunningQueries", 0);
94+
clusterStats.runningQueryCount(runningQueryCount);
95+
96+
int queuedQueries = stats.get("QueuedQueries");
97+
int runningQueries = stats.get("RunningQueries");
98+
log.debug(String.format("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d", queuedQueries, runningQueries));
99+
}
100+
catch (Exception e) {
101+
log.error(e, "Error parsing QueryManager stats");
102+
}
103+
}
104+
105+
@Override
106+
public ClusterStats monitor(ProxyBackendConfiguration backend)
107+
{
108+
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo());
109+
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);
110+
111+
processJmxStats(backend, "trino.metadata:name=DiscoveryNodeManager", ClusterStatsJmxMonitor::processDiscoveryNodeManagerStats, clusterStats);
112+
processJmxStats(backend, "trino.execution:name=QueryManager", ClusterStatsJmxMonitor::processQueryManagerStats, clusterStats);
113+
114+
clusterStats.proxyTo(backend.getProxyTo())
115+
.externalUrl(backend.getExternalUrl())
116+
.routingGroup(backend.getRoutingGroup());
117+
118+
ClusterStats stats = clusterStats.build();
119+
log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats);
120+
return stats;
121+
}
122+
123+
private void processJmxStats(ProxyBackendConfiguration backend, String mbeanName, JmxStatProcessor processor, ClusterStats.Builder clusterStats)
124+
{
125+
queryJmx(backend, mbeanName)
126+
.ifPresentOrElse(response -> processor.process(response, clusterStats), () -> clusterStats.trinoStatus(TrinoStatus.UNHEALTHY));
127+
}
128+
129+
private Optional<JmxResponse> queryJmx(ProxyBackendConfiguration backend, String mbeanName)
130+
{
131+
requireNonNull(backend, "backend is null");
132+
requireNonNull(mbeanName, "mbeanName is null");
133+
134+
String jmxUrl = backend.getProxyTo();
135+
Request request;
136+
137+
Request preparedRequest = prepareGet()
138+
.setUri(uriBuilderFrom(URI.create(jmxUrl))
139+
.appendPath(JMX_PATH)
140+
.appendPath(mbeanName)
141+
.build())
142+
.addHeader("X-Trino-User", username)
143+
.build();
144+
145+
boolean isHttps = preparedRequest.getUri().getScheme().equalsIgnoreCase("https");
146+
147+
if (isHttps) {
148+
HttpRequestFilter filter = new BasicAuthRequestFilter(username, password);
149+
request = filter.filterRequest(preparedRequest);
150+
}
151+
else {
152+
request = preparedRequest;
153+
}
154+
155+
log.debug("Querying JMX at %s for %s", request.getUri(), mbeanName);
156+
157+
try {
158+
JsonNode response = client.execute(request, JMX_JSON_RESPONSE_HANDLER);
159+
return Optional.ofNullable(response).map(JmxResponse::fromJson);
160+
}
161+
catch (UnexpectedResponseException e) {
162+
log.error(e, "Failed to fetch JMX data for %s, response code: %d", mbeanName, e.getStatusCode());
163+
return Optional.empty();
164+
}
165+
catch (Exception e) {
166+
log.error(e, "Exception while querying JMX at %s", jmxUrl);
167+
return Optional.empty();
168+
}
169+
}
170+
171+
@FunctionalInterface
172+
private interface JmxStatProcessor
173+
{
174+
void process(JmxResponse response, ClusterStats.Builder clusterStats);
175+
}
176+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
18+
public record JmxAttribute(String name, int value)
19+
{
20+
public static JmxAttribute fromJson(JsonNode json)
21+
{
22+
return new JmxAttribute(
23+
json.get("name").asText(),
24+
json.get("value").asInt());
25+
}
26+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.fasterxml.jackson.databind.JsonNode;
17+
import com.google.common.collect.ImmutableList;
18+
19+
import java.util.List;
20+
import java.util.stream.StreamSupport;
21+
22+
import static com.google.common.collect.ImmutableList.toImmutableList;
23+
24+
public record JmxResponse(List<JmxAttribute> attributes)
25+
{
26+
public JmxResponse
27+
{
28+
attributes = ImmutableList.copyOf(attributes);
29+
}
30+
31+
public static JmxResponse fromJson(JsonNode json)
32+
{
33+
List<JmxAttribute> attributes = StreamSupport.stream(json.get("attributes").spliterator(), false)
34+
.map(JmxAttribute::fromJson)
35+
.collect(toImmutableList());
36+
return new JmxResponse(attributes);
37+
}
38+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
1818
NOOP,
1919
INFO_API,
2020
UI_API,
21-
JDBC
21+
JDBC,
22+
JMX
2223
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
2323
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
2424
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
25+
import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor;
2526
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
2627
import io.trino.gateway.ha.clustermonitor.ClusterStatsObserver;
2728
import io.trino.gateway.ha.clustermonitor.ForMonitor;
@@ -228,6 +229,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
228229
case INFO_API -> new ClusterStatsInfoApiMonitor(httpClient, configuration.getMonitor());
229230
case UI_API -> new ClusterStatsHttpMonitor(configuration.getBackendState());
230231
case JDBC -> new ClusterStatsJdbcMonitor(configuration.getBackendState(), configuration.getMonitor());
232+
case JMX -> new ClusterStatsJmxMonitor(httpClient, configuration.getBackendState());
231233
case NOOP -> new NoopClusterStatsMonitor();
232234
};
233235
}

0 commit comments

Comments
 (0)