Skip to content

Commit 23f46c6

Browse files
committed
Add a monitor for the OpenMetrics endpoint
This populates the running and queued query metrics for active load balancing, and allows defining health using minimum and maximum values for arbitrary metrics
1 parent c2ba1ea commit 23f46c6

File tree

9 files changed

+361
-16
lines changed

9 files changed

+361
-16
lines changed

docs/installation.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,38 @@ monitor:
377377

378378
All timeout parameters are optional.
379379

380+
#### METRICS
381+
382+
This pulls statistics from Trino's [OpenMetrics](https://openmetrics.io/) endpoint.
383+
It retrieves the number of running and queued queries for use with
384+
the `QueryCountBasedRouter` (either `METRICS` or `JDBC` must be enabled if
385+
`QueryCountBasedRouter` is used).
386+
387+
This monitor allows customizing health definitions by comparing metrics to fixed
388+
values. This is configured through two maps: `metricMinimumValues` and
389+
`metricMaximumValues`. The keys of these maps are the metric names, and the values
390+
are the minimum or maximum values (inclusive) that are considered healthy. By default,
391+
the only metric populated is:
392+
393+
```yaml
394+
monitorConfiguration:
395+
metricMinimumValues:
396+
trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount: 1
397+
```
398+
399+
This requires the cluster to have at least one active worker node in order to be considered
400+
healthy. The map is overwritten if configured explicitly. For example, to increase the minimum
401+
worker count to 10 and disqualify clusters that have been experiencing frequent major Garbage
402+
Collections, set
403+
404+
```yaml
405+
monitorConfiguration:
406+
metricMinimumValues:
407+
trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount: 10
408+
metricMaximumValues:
409+
io_airlift_stats_name_GcMonitor_MajorGc_FiveMinutes_count: 2
410+
```
411+
380412
#### JDBC
381413

382414
This uses a JDBC connection to query `system.runtime` tables for cluster

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
2828
import static io.airlift.http.client.Request.Builder.prepareGet;
2929
import static io.airlift.json.JsonCodec.jsonCodec;
30-
import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
31-
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
32-
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
30+
import static io.trino.gateway.ha.clustermonitor.MonitorUtils.shouldRetry;
3331
import static java.util.Objects.requireNonNull;
3432

3533
public class ClusterStatsInfoApiMonitor
@@ -88,16 +86,4 @@ private TrinoStatus checkStatus(String baseUrl, int retriesRemaining)
8886
}
8987
return TrinoStatus.UNHEALTHY;
9088
}
91-
92-
public static boolean shouldRetry(int statusCode)
93-
{
94-
switch (statusCode) {
95-
case HTTP_BAD_GATEWAY:
96-
case HTTP_UNAVAILABLE:
97-
case HTTP_GATEWAY_TIMEOUT:
98-
return true;
99-
default:
100-
return false;
101-
}
102-
}
10389
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import com.google.common.collect.ImmutableSet;
18+
import io.airlift.http.client.HttpClient;
19+
import io.airlift.http.client.HttpUriBuilder;
20+
import io.airlift.http.client.Request;
21+
import io.airlift.http.client.Response;
22+
import io.airlift.http.client.ResponseHandler;
23+
import io.airlift.http.client.UnexpectedResponseException;
24+
import io.airlift.log.Logger;
25+
import io.trino.gateway.ha.config.BackendStateConfiguration;
26+
import io.trino.gateway.ha.config.MonitorConfiguration;
27+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
28+
import io.trino.gateway.ha.security.util.BasicCredentials;
29+
30+
import java.io.IOException;
31+
import java.net.URI;
32+
import java.util.Arrays;
33+
import java.util.Map;
34+
35+
import static com.google.common.base.Strings.isNullOrEmpty;
36+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
37+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
38+
import static io.airlift.http.client.Request.Builder.prepareGet;
39+
import static io.airlift.http.client.ResponseHandlerUtils.propagate;
40+
import static io.trino.gateway.ha.clustermonitor.MonitorUtils.shouldRetry;
41+
import static java.lang.String.format;
42+
import static java.nio.charset.StandardCharsets.UTF_8;
43+
import static java.util.Objects.requireNonNull;
44+
45+
public class ClusterStatsMetricsMonitor
46+
implements ClusterStatsMonitor
47+
{
48+
public static final String RUNNING_QUERIES_METRIC = "trino_execution_name_QueryManager_RunningQueries";
49+
public static final String QUEUED_QUERIES_METRIC = "trino_execution_name_QueryManager_QueuedQueries";
50+
private static final Logger log = Logger.get(ClusterStatsMetricsMonitor.class);
51+
private final HttpClient httpClient;
52+
private final int retries;
53+
private final MetricsResponseHandler metricsResponseHandler;
54+
private final Header identityHeader;
55+
private final String metricsEndpoint;
56+
private final ImmutableSet<String> metricNames;
57+
private final Map<String, Float> metricMinimumValues;
58+
private final Map<String, Float> metricMaximumValues;
59+
60+
public ClusterStatsMetricsMonitor(HttpClient httpClient, BackendStateConfiguration backendStateConfiguration, MonitorConfiguration monitorConfiguration)
61+
{
62+
this.httpClient = requireNonNull(httpClient, "client is null");
63+
retries = monitorConfiguration.getRetries();
64+
if (!isNullOrEmpty(backendStateConfiguration.getPassword())) {
65+
identityHeader = new Header("Authorization",
66+
new BasicCredentials(backendStateConfiguration.getUsername(), backendStateConfiguration.getPassword()).getBasicAuthHeader());
67+
}
68+
else {
69+
identityHeader = new Header("X-Trino-User", backendStateConfiguration.getUsername());
70+
}
71+
metricsEndpoint = monitorConfiguration.getMetricsEndpoint();
72+
metricMinimumValues = ImmutableMap.copyOf(monitorConfiguration.getMetricMinimumValues());
73+
metricMaximumValues = ImmutableMap.copyOf(monitorConfiguration.getMetricMaximumValues());
74+
metricNames = ImmutableSet.<String>builder()
75+
.add(RUNNING_QUERIES_METRIC, QUEUED_QUERIES_METRIC)
76+
.addAll(metricMinimumValues.keySet())
77+
.addAll(metricMaximumValues.keySet())
78+
.build();
79+
metricsResponseHandler = new MetricsResponseHandler(metricNames);
80+
}
81+
82+
private static ClusterStats getUnhealthyStats(ProxyBackendConfiguration backend)
83+
{
84+
return ClusterStats.builder(backend.getName())
85+
.trinoStatus(TrinoStatus.UNHEALTHY)
86+
.proxyTo(backend.getProxyTo())
87+
.externalUrl(backend.getExternalUrl())
88+
.routingGroup(backend.getRoutingGroup())
89+
.build();
90+
}
91+
92+
@Override
93+
public ClusterStats monitor(ProxyBackendConfiguration backend)
94+
{
95+
Map<String, String> metrics = getMetrics(backend.getProxyTo(), retries);
96+
if (metrics.isEmpty()) {
97+
log.error(format("No metrics available for %s!", backend.getName()));
98+
return getUnhealthyStats(backend);
99+
}
100+
101+
for (Map.Entry<String, Float> entry : metricMinimumValues.entrySet()) {
102+
if (!metrics.containsKey(entry.getKey())
103+
|| Float.parseFloat(metrics.get(entry.getKey())) < entry.getValue()) {
104+
log.warn(format("Health metric value below min for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey())));
105+
return getUnhealthyStats(backend);
106+
}
107+
}
108+
109+
for (Map.Entry<String, Float> entry : metricMaximumValues.entrySet()) {
110+
if (!metrics.containsKey(entry.getKey())
111+
|| Float.parseFloat(metrics.get(entry.getKey())) > entry.getValue()) {
112+
log.warn(format("Health metric value over max for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey())));
113+
return getUnhealthyStats(backend);
114+
}
115+
}
116+
return ClusterStats.builder(backend.getName())
117+
.trinoStatus(TrinoStatus.HEALTHY)
118+
.runningQueryCount((int) Float.parseFloat(metrics.get(RUNNING_QUERIES_METRIC)))
119+
.queuedQueryCount((int) Float.parseFloat(metrics.get(QUEUED_QUERIES_METRIC)))
120+
.proxyTo(backend.getProxyTo())
121+
.externalUrl(backend.getExternalUrl())
122+
.routingGroup(backend.getRoutingGroup())
123+
.build();
124+
}
125+
126+
private Map<String, String> getMetrics(String baseUrl, int retriesRemaining)
127+
{
128+
HttpUriBuilder uri = uriBuilderFrom(URI.create(baseUrl)).appendPath(metricsEndpoint);
129+
for (String metric : metricNames) {
130+
uri.addParameter("name[]", metric);
131+
}
132+
133+
Request request = prepareGet()
134+
.setUri(uri.build())
135+
.addHeader(identityHeader.name, identityHeader.value)
136+
.addHeader("Content-Type", "application/openmetrics-text; version=1.0.0; charset=utf-8")
137+
.build();
138+
try {
139+
return httpClient.execute(request, metricsResponseHandler);
140+
}
141+
catch (UnexpectedResponseException e) {
142+
if (shouldRetry(e.getStatusCode())) {
143+
if (retriesRemaining > 0) {
144+
log.warn("Retrying health check on error: %s, ", e.toString());
145+
return getMetrics(baseUrl, retriesRemaining - 1);
146+
}
147+
log.error("Encountered error %s, no retries remaining", e.toString());
148+
}
149+
log.error(e, "Health check failed with non-retryable response.\n%s", e.toString());
150+
}
151+
catch (Exception e) {
152+
log.error(e, "Exception checking %s for health", request.getUri());
153+
}
154+
return ImmutableMap.of();
155+
}
156+
157+
private static class MetricsResponseHandler
158+
implements ResponseHandler<Map<String, String>, RuntimeException>
159+
{
160+
private final ImmutableSet<String> requiredKeys;
161+
162+
public MetricsResponseHandler(ImmutableSet<String> requiredKeys)
163+
{
164+
this.requiredKeys = requireNonNull(requiredKeys);
165+
}
166+
167+
@Override
168+
public Map<String, String> handleException(Request request, Exception exception)
169+
throws RuntimeException
170+
{
171+
throw propagate(request, exception);
172+
}
173+
174+
@Override
175+
public Map<String, String> handle(Request request, Response response)
176+
throws RuntimeException
177+
{
178+
try {
179+
String responseBody = new String(response.getInputStream().readAllBytes(), UTF_8);
180+
Map<String, String> metrics = Arrays.stream(responseBody.split("\n"))
181+
.filter(s -> !s.startsWith("#"))
182+
.collect(toImmutableMap(s -> s.split(" ")[0], s -> s.split(" ")[1]));
183+
if (!metrics.keySet().containsAll(requiredKeys)) {
184+
throw new UnexpectedResponseException(
185+
format("Request is missing required keys: \n%s\nin response: '%s'", String.join("\n", requiredKeys), responseBody),
186+
request,
187+
response);
188+
}
189+
return metrics;
190+
}
191+
catch (IOException e) {
192+
throw new UnexpectedResponseException(request, response);
193+
}
194+
}
195+
}
196+
197+
private record Header(String name, String value) {}
198+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
17+
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
18+
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
19+
20+
public final class MonitorUtils
21+
{
22+
private MonitorUtils() {}
23+
24+
public static boolean shouldRetry(int statusCode)
25+
{
26+
return switch (statusCode) {
27+
case HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT -> true;
28+
default -> false;
29+
};
30+
}
31+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
1818
NOOP,
1919
INFO_API,
2020
UI_API,
21-
JDBC
21+
JDBC,
22+
METRICS
2223
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
*/
1414
package io.trino.gateway.ha.config;
1515

16+
import com.google.common.collect.ImmutableMap;
1617
import io.airlift.units.Duration;
1718
import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor;
1819

20+
import java.util.Map;
21+
1922
import static java.util.concurrent.TimeUnit.SECONDS;
2023

2124
public class MonitorConfiguration
@@ -28,6 +31,13 @@ public class MonitorConfiguration
2831

2932
private boolean explicitPrepare;
3033

34+
private String metricsEndpoint = "/metrics";
35+
36+
// Require 1 node for health by default. This configuration only applies to the ClusterStatsMetricsMonitor
37+
private Map<String, Float> metricMinimumValues = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 1f);
38+
39+
private Map<String, Float> metricMaximumValues = ImmutableMap.of();
40+
3141
public MonitorConfiguration() {}
3242

3343
public int getTaskDelaySeconds()
@@ -69,4 +79,34 @@ public void setExplicitPrepare(boolean explicitPrepare)
6979
{
7080
this.explicitPrepare = explicitPrepare;
7181
}
82+
83+
public String getMetricsEndpoint()
84+
{
85+
return metricsEndpoint;
86+
}
87+
88+
public void setMetricsEndpoint(String metricsEndpoint)
89+
{
90+
this.metricsEndpoint = metricsEndpoint;
91+
}
92+
93+
public Map<String, Float> getMetricMinimumValues()
94+
{
95+
return metricMinimumValues;
96+
}
97+
98+
public void setMetricMinimumValues(Map<String, Float> metricMinimumValues)
99+
{
100+
this.metricMinimumValues = ImmutableMap.copyOf(metricMinimumValues);
101+
}
102+
103+
public Map<String, Float> getMetricMaximumValues()
104+
{
105+
return ImmutableMap.copyOf(metricMaximumValues);
106+
}
107+
108+
public void setMetricMaximumValues(Map<String, Float> metricMaximumValues)
109+
{
110+
this.metricMaximumValues = metricMaximumValues;
111+
}
72112
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
2121
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
2222
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
23+
import io.trino.gateway.ha.clustermonitor.ClusterStatsMetricsMonitor;
2324
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
2425
import io.trino.gateway.ha.clustermonitor.ForMonitor;
2526
import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor;
@@ -51,6 +52,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
5152
case UI_API -> new ClusterStatsHttpMonitor(config.getBackendState());
5253
case JDBC -> new ClusterStatsJdbcMonitor(config.getBackendState(), config.getMonitor());
5354
case NOOP -> new NoopClusterStatsMonitor();
55+
case METRICS -> new ClusterStatsMetricsMonitor(httpClient, config.getBackendState(), config.getMonitor());
5456
};
5557
}
5658
}

0 commit comments

Comments
 (0)