Skip to content

Commit bd2f513

Browse files
willmostlyebyhr
andauthored
Add a monitor for the OpenMetrics endpoint
This populates the running and queued query metrics for active load balancing, and allows defining health using minimum and maximum values for arbitrary metrics Co-authored-by: Yuya Ebihara <[email protected]>
1 parent dfcc2dd commit bd2f513

File tree

9 files changed

+363
-16
lines changed

9 files changed

+363
-16
lines changed

docs/installation.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,38 @@ monitor:
387387

388388
All timeout parameters are optional.
389389

390+
#### METRICS
391+
392+
This pulls statistics from Trino's [OpenMetrics](https://openmetrics.io/) endpoint.
393+
It retrieves the number of running and queued queries for use with
394+
the `QueryCountBasedRouter` (either `METRICS` or `JDBC` must be enabled if
395+
`QueryCountBasedRouter` is used).
396+
397+
This monitor allows customizing health definitions by comparing metrics to fixed
398+
values. This is configured through two maps: `metricMinimumValues` and
399+
`metricMaximumValues`. The keys of these maps are the metric names, and the values
400+
are the minimum or maximum values (inclusive) that are considered healthy. By default,
401+
the only metric populated is:
402+
403+
```yaml
404+
monitorConfiguration:
405+
metricMinimumValues:
406+
trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount: 1
407+
```
408+
409+
This requires the cluster to have at least one active worker node in order to be considered
410+
healthy. The map is overwritten if configured explicitly. For example, to increase the minimum
411+
worker count to 10 and disqualify clusters that have been experiencing frequent major Garbage
412+
Collections, set
413+
414+
```yaml
415+
monitorConfiguration:
416+
metricMinimumValues:
417+
trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount: 10
418+
metricMaximumValues:
419+
io_airlift_stats_name_GcMonitor_MajorGc_FiveMinutes_count: 2
420+
```
421+
390422
#### JDBC
391423

392424
This uses a JDBC connection to query `system.runtime` tables for cluster

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
2828
import static io.airlift.http.client.Request.Builder.prepareGet;
2929
import static io.airlift.json.JsonCodec.jsonCodec;
30-
import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
31-
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
32-
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
30+
import static io.trino.gateway.ha.clustermonitor.MonitorUtils.shouldRetry;
3331
import static java.util.Objects.requireNonNull;
3432

3533
public class ClusterStatsInfoApiMonitor
@@ -88,16 +86,4 @@ private TrinoStatus checkStatus(String baseUrl, int retriesRemaining)
8886
}
8987
return TrinoStatus.UNHEALTHY;
9088
}
91-
92-
public static boolean shouldRetry(int statusCode)
93-
{
94-
switch (statusCode) {
95-
case HTTP_BAD_GATEWAY:
96-
case HTTP_UNAVAILABLE:
97-
case HTTP_GATEWAY_TIMEOUT:
98-
return true;
99-
default:
100-
return false;
101-
}
102-
}
10389
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import com.google.common.collect.ImmutableSet;
18+
import io.airlift.http.client.HttpClient;
19+
import io.airlift.http.client.HttpUriBuilder;
20+
import io.airlift.http.client.Request;
21+
import io.airlift.http.client.Response;
22+
import io.airlift.http.client.ResponseHandler;
23+
import io.airlift.http.client.UnexpectedResponseException;
24+
import io.airlift.log.Logger;
25+
import io.trino.gateway.ha.config.BackendStateConfiguration;
26+
import io.trino.gateway.ha.config.MonitorConfiguration;
27+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
28+
import io.trino.gateway.ha.security.util.BasicCredentials;
29+
30+
import java.io.IOException;
31+
import java.net.URI;
32+
import java.util.Arrays;
33+
import java.util.Map;
34+
import java.util.Set;
35+
36+
import static com.google.common.base.Strings.isNullOrEmpty;
37+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
38+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
39+
import static io.airlift.http.client.Request.Builder.prepareGet;
40+
import static io.airlift.http.client.ResponseHandlerUtils.propagate;
41+
import static io.trino.gateway.ha.clustermonitor.MonitorUtils.shouldRetry;
42+
import static java.lang.String.format;
43+
import static java.nio.charset.StandardCharsets.UTF_8;
44+
import static java.util.Objects.requireNonNull;
45+
46+
public class ClusterStatsMetricsMonitor
47+
implements ClusterStatsMonitor
48+
{
49+
public static final String RUNNING_QUERIES_METRIC = "trino_execution_name_QueryManager_RunningQueries";
50+
public static final String QUEUED_QUERIES_METRIC = "trino_execution_name_QueryManager_QueuedQueries";
51+
private static final Logger log = Logger.get(ClusterStatsMetricsMonitor.class);
52+
53+
private final HttpClient httpClient;
54+
private final int retries;
55+
private final MetricsResponseHandler metricsResponseHandler;
56+
private final Header identityHeader;
57+
private final String metricsEndpoint;
58+
private final ImmutableSet<String> metricNames;
59+
private final Map<String, Float> metricMinimumValues;
60+
private final Map<String, Float> metricMaximumValues;
61+
62+
public ClusterStatsMetricsMonitor(HttpClient httpClient, BackendStateConfiguration backendStateConfiguration, MonitorConfiguration monitorConfiguration)
63+
{
64+
this.httpClient = requireNonNull(httpClient, "httpClient is null");
65+
retries = monitorConfiguration.getRetries();
66+
if (!isNullOrEmpty(backendStateConfiguration.getPassword())) {
67+
identityHeader = new Header("Authorization",
68+
new BasicCredentials(backendStateConfiguration.getUsername(), backendStateConfiguration.getPassword()).getBasicAuthHeader());
69+
}
70+
else {
71+
identityHeader = new Header("X-Trino-User", backendStateConfiguration.getUsername());
72+
}
73+
metricsEndpoint = monitorConfiguration.getMetricsEndpoint();
74+
metricMinimumValues = ImmutableMap.copyOf(monitorConfiguration.getMetricMinimumValues());
75+
metricMaximumValues = ImmutableMap.copyOf(monitorConfiguration.getMetricMaximumValues());
76+
metricNames = ImmutableSet.<String>builder()
77+
.add(RUNNING_QUERIES_METRIC, QUEUED_QUERIES_METRIC)
78+
.addAll(metricMinimumValues.keySet())
79+
.addAll(metricMaximumValues.keySet())
80+
.build();
81+
metricsResponseHandler = new MetricsResponseHandler(metricNames);
82+
}
83+
84+
private static ClusterStats getUnhealthyStats(ProxyBackendConfiguration backend)
85+
{
86+
return ClusterStats.builder(backend.getName())
87+
.trinoStatus(TrinoStatus.UNHEALTHY)
88+
.proxyTo(backend.getProxyTo())
89+
.externalUrl(backend.getExternalUrl())
90+
.routingGroup(backend.getRoutingGroup())
91+
.build();
92+
}
93+
94+
@Override
95+
public ClusterStats monitor(ProxyBackendConfiguration backend)
96+
{
97+
Map<String, String> metrics = getMetrics(backend.getProxyTo(), retries);
98+
if (metrics.isEmpty()) {
99+
log.error("No metrics available for %s!", backend.getName());
100+
return getUnhealthyStats(backend);
101+
}
102+
103+
for (Map.Entry<String, Float> entry : metricMinimumValues.entrySet()) {
104+
if (!metrics.containsKey(entry.getKey())
105+
|| Float.parseFloat(metrics.get(entry.getKey())) < entry.getValue()) {
106+
log.warn("Health metric value below min for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey()));
107+
return getUnhealthyStats(backend);
108+
}
109+
}
110+
111+
for (Map.Entry<String, Float> entry : metricMaximumValues.entrySet()) {
112+
if (!metrics.containsKey(entry.getKey())
113+
|| Float.parseFloat(metrics.get(entry.getKey())) > entry.getValue()) {
114+
log.warn("Health metric value over max for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey()));
115+
return getUnhealthyStats(backend);
116+
}
117+
}
118+
return ClusterStats.builder(backend.getName())
119+
.trinoStatus(TrinoStatus.HEALTHY)
120+
.runningQueryCount((int) Float.parseFloat(metrics.get(RUNNING_QUERIES_METRIC)))
121+
.queuedQueryCount((int) Float.parseFloat(metrics.get(QUEUED_QUERIES_METRIC)))
122+
.proxyTo(backend.getProxyTo())
123+
.externalUrl(backend.getExternalUrl())
124+
.routingGroup(backend.getRoutingGroup())
125+
.build();
126+
}
127+
128+
private Map<String, String> getMetrics(String baseUrl, int retriesRemaining)
129+
{
130+
HttpUriBuilder uri = uriBuilderFrom(URI.create(baseUrl)).appendPath(metricsEndpoint);
131+
for (String metric : metricNames) {
132+
uri.addParameter("name[]", metric);
133+
}
134+
135+
Request request = prepareGet()
136+
.setUri(uri.build())
137+
.addHeader(identityHeader.name, identityHeader.value)
138+
.addHeader("Content-Type", "application/openmetrics-text; version=1.0.0; charset=utf-8")
139+
.build();
140+
try {
141+
return httpClient.execute(request, metricsResponseHandler);
142+
}
143+
catch (UnexpectedResponseException e) {
144+
if (shouldRetry(e.getStatusCode())) {
145+
if (retriesRemaining > 0) {
146+
log.warn("Retrying health check on error: %s, ", e.toString());
147+
return getMetrics(baseUrl, retriesRemaining - 1);
148+
}
149+
log.error("Encountered error %s, no retries remaining", e.toString());
150+
}
151+
log.error(e, "Health check failed with non-retryable response.\n%s", e.toString());
152+
}
153+
catch (Exception e) {
154+
log.error(e, "Exception checking %s for health", request.getUri());
155+
}
156+
return ImmutableMap.of();
157+
}
158+
159+
private static class MetricsResponseHandler
160+
implements ResponseHandler<Map<String, String>, RuntimeException>
161+
{
162+
private final ImmutableSet<String> requiredKeys;
163+
164+
public MetricsResponseHandler(Set<String> requiredKeys)
165+
{
166+
this.requiredKeys = ImmutableSet.copyOf(requiredKeys);
167+
}
168+
169+
@Override
170+
public Map<String, String> handleException(Request request, Exception exception)
171+
throws RuntimeException
172+
{
173+
throw propagate(request, exception);
174+
}
175+
176+
@Override
177+
public Map<String, String> handle(Request request, Response response)
178+
throws RuntimeException
179+
{
180+
try {
181+
String responseBody = new String(response.getInputStream().readAllBytes(), UTF_8);
182+
Map<String, String> metrics = Arrays.stream(responseBody.split("\n"))
183+
.filter(line -> !line.startsWith("#"))
184+
.collect(toImmutableMap(s -> s.split(" ")[0], s -> s.split(" ")[1]));
185+
if (!metrics.keySet().containsAll(requiredKeys)) {
186+
throw new UnexpectedResponseException(
187+
format("Request is missing required keys: \n%s\nin response: '%s'", String.join("\n", requiredKeys), responseBody),
188+
request,
189+
response);
190+
}
191+
return metrics;
192+
}
193+
catch (IOException e) {
194+
throw new UnexpectedResponseException(request, response);
195+
}
196+
}
197+
}
198+
199+
private record Header(String name, String value) {}
200+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
17+
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
18+
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
19+
20+
public final class MonitorUtils
21+
{
22+
private MonitorUtils() {}
23+
24+
public static boolean shouldRetry(int statusCode)
25+
{
26+
return switch (statusCode) {
27+
case HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT -> true;
28+
default -> false;
29+
};
30+
}
31+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ public enum ClusterStatsMonitorType
1919
INFO_API,
2020
UI_API,
2121
JDBC,
22-
JMX
22+
JMX,
23+
METRICS
2324
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
*/
1414
package io.trino.gateway.ha.config;
1515

16+
import com.google.common.collect.ImmutableMap;
1617
import io.airlift.units.Duration;
1718
import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor;
1819

20+
import java.util.Map;
21+
1922
import static java.util.concurrent.TimeUnit.SECONDS;
2023

2124
public class MonitorConfiguration
@@ -28,6 +31,13 @@ public class MonitorConfiguration
2831

2932
private boolean explicitPrepare;
3033

34+
private String metricsEndpoint = "/metrics";
35+
36+
// Require 1 node for health by default. This configuration only applies to the ClusterStatsMetricsMonitor
37+
private Map<String, Float> metricMinimumValues = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 1f);
38+
39+
private Map<String, Float> metricMaximumValues = ImmutableMap.of();
40+
3141
public MonitorConfiguration() {}
3242

3343
public int getTaskDelaySeconds()
@@ -69,4 +79,34 @@ public void setExplicitPrepare(boolean explicitPrepare)
6979
{
7080
this.explicitPrepare = explicitPrepare;
7181
}
82+
83+
public String getMetricsEndpoint()
84+
{
85+
return metricsEndpoint;
86+
}
87+
88+
public void setMetricsEndpoint(String metricsEndpoint)
89+
{
90+
this.metricsEndpoint = metricsEndpoint;
91+
}
92+
93+
public Map<String, Float> getMetricMinimumValues()
94+
{
95+
return metricMinimumValues;
96+
}
97+
98+
public void setMetricMinimumValues(Map<String, Float> metricMinimumValues)
99+
{
100+
this.metricMinimumValues = ImmutableMap.copyOf(metricMinimumValues);
101+
}
102+
103+
public Map<String, Float> getMetricMaximumValues()
104+
{
105+
return ImmutableMap.copyOf(metricMaximumValues);
106+
}
107+
108+
public void setMetricMaximumValues(Map<String, Float> metricMaximumValues)
109+
{
110+
this.metricMaximumValues = metricMaximumValues;
111+
}
72112
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
2424
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
2525
import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor;
26+
import io.trino.gateway.ha.clustermonitor.ClusterStatsMetricsMonitor;
2627
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
2728
import io.trino.gateway.ha.clustermonitor.ClusterStatsObserver;
2829
import io.trino.gateway.ha.clustermonitor.ForMonitor;
@@ -230,6 +231,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
230231
case UI_API -> new ClusterStatsHttpMonitor(configuration.getBackendState());
231232
case JDBC -> new ClusterStatsJdbcMonitor(configuration.getBackendState(), configuration.getMonitor());
232233
case JMX -> new ClusterStatsJmxMonitor(httpClient, configuration.getBackendState());
234+
case METRICS -> new ClusterStatsMetricsMonitor(httpClient, configuration.getBackendState(), configuration.getMonitor());
233235
case NOOP -> new NoopClusterStatsMonitor();
234236
};
235237
}

0 commit comments

Comments
 (0)