Skip to content

Commit d9e5068

Browse files
committed
Add a monitor for the OpenMetrics endpoint. This populates the running and queued query metrics for active load balancing, and allows defining health using minimum and maximum values for arbitrary metrics
1 parent a54cb14 commit d9e5068

File tree

8 files changed

+315
-16
lines changed

8 files changed

+315
-16
lines changed

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
2828
import static io.airlift.http.client.Request.Builder.prepareGet;
2929
import static io.airlift.json.JsonCodec.jsonCodec;
30-
import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
31-
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
32-
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
30+
import static io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor.shouldRetry;
3331
import static java.util.Objects.requireNonNull;
3432

3533
public class ClusterStatsInfoApiMonitor
@@ -88,16 +86,4 @@ private TrinoStatus checkStatus(String baseUrl, int retriesRemaining)
8886
}
8987
return TrinoStatus.UNHEALTHY;
9088
}
91-
92-
public static boolean shouldRetry(int statusCode)
93-
{
94-
switch (statusCode) {
95-
case HTTP_BAD_GATEWAY:
96-
case HTTP_UNAVAILABLE:
97-
case HTTP_GATEWAY_TIMEOUT:
98-
return true;
99-
default:
100-
return false;
101-
}
102-
}
10389
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import com.google.common.collect.ImmutableSet;
18+
import io.airlift.http.client.HttpClient;
19+
import io.airlift.http.client.HttpUriBuilder;
20+
import io.airlift.http.client.Request;
21+
import io.airlift.http.client.Response;
22+
import io.airlift.http.client.ResponseHandler;
23+
import io.airlift.http.client.UnexpectedResponseException;
24+
import io.airlift.log.Logger;
25+
import io.trino.gateway.ha.config.BackendStateConfiguration;
26+
import io.trino.gateway.ha.config.MonitorConfiguration;
27+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
28+
import io.trino.gateway.ha.security.util.BasicCredentials;
29+
30+
import java.io.IOException;
31+
import java.net.URI;
32+
import java.util.Arrays;
33+
import java.util.Map;
34+
35+
import static com.google.common.base.Strings.isNullOrEmpty;
36+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
37+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
38+
import static io.airlift.http.client.Request.Builder.prepareGet;
39+
import static io.airlift.http.client.ResponseHandlerUtils.propagate;
40+
import static io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor.shouldRetry;
41+
import static java.nio.charset.StandardCharsets.UTF_8;
42+
import static java.util.Objects.requireNonNull;
43+
44+
public class ClusterStatsMetricsMonitor
45+
implements ClusterStatsMonitor
46+
{
47+
public static final String RUNNING_QUERIES_METRIC = "trino_execution_name_QueryManager_RunningQueries";
48+
public static final String QUEUED_QUERIES_METRIC = "trino_execution_name_QueryManager_QueuedQueries";
49+
private static final Logger log = Logger.get(ClusterStatsMetricsMonitor.class);
50+
private final HttpClient client;
51+
private final int retries;
52+
private final MetricsResponseHandler metricsResponseHandler;
53+
private final Header identityHeader;
54+
private final String metricsEndpoint;
55+
private final ImmutableSet<String> metricNames;
56+
private final Map<String, Float> metricMinimumValues;
57+
private final Map<String, Float> metricMaximumValues;
58+
59+
public ClusterStatsMetricsMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration, MonitorConfiguration monitorConfiguration)
60+
{
61+
this.client = requireNonNull(client, "client is null");
62+
retries = monitorConfiguration.getRetries();
63+
if (!isNullOrEmpty(backendStateConfiguration.getPassword())) {
64+
identityHeader = new Header("Authorization",
65+
new BasicCredentials(backendStateConfiguration.getUsername(), backendStateConfiguration.getPassword()).getBasicAuthHeader());
66+
}
67+
else {
68+
identityHeader = new Header("X-Trino-User", backendStateConfiguration.getUsername());
69+
}
70+
metricsEndpoint = monitorConfiguration.getMetricsEndpoint();
71+
metricMinimumValues = monitorConfiguration.getMetricMinimumValues();
72+
metricMaximumValues = monitorConfiguration.getMetricMaximumValues();
73+
metricNames = ImmutableSet.<String>builder()
74+
.add(RUNNING_QUERIES_METRIC, QUEUED_QUERIES_METRIC)
75+
.addAll(metricMinimumValues.keySet())
76+
.addAll(metricMaximumValues.keySet())
77+
.build();
78+
metricsResponseHandler = new MetricsResponseHandler(metricNames);
79+
}
80+
81+
private ClusterStats getUnhealthyStats(ProxyBackendConfiguration backend)
82+
{
83+
return ClusterStats.builder(backend.getName())
84+
.trinoStatus(TrinoStatus.UNHEALTHY)
85+
.proxyTo(backend.getProxyTo())
86+
.externalUrl(backend.getExternalUrl())
87+
.routingGroup(backend.getRoutingGroup())
88+
.build();
89+
}
90+
91+
@Override
92+
public ClusterStats monitor(ProxyBackendConfiguration backend)
93+
{
94+
Map<String, String> metrics = getMetrics(backend.getProxyTo(), retries);
95+
if (metrics.isEmpty()) {
96+
log.error(String.format("No metrics available for %s!", backend.getName()));
97+
return getUnhealthyStats(backend);
98+
}
99+
100+
for (Map.Entry<String, Float> entry : metricMinimumValues.entrySet()) {
101+
if (!metrics.containsKey(entry.getKey())
102+
|| Float.parseFloat(metrics.get(entry.getKey())) < entry.getValue()) {
103+
log.warn(String.format("Health metric value below min for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey())));
104+
return getUnhealthyStats(backend);
105+
}
106+
}
107+
108+
for (Map.Entry<String, Float> entry : metricMaximumValues.entrySet()) {
109+
if (!metrics.containsKey(entry.getKey())
110+
|| Float.parseFloat(metrics.get(entry.getKey())) > entry.getValue()) {
111+
log.warn(String.format("Health metric value over max for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey())));
112+
return getUnhealthyStats(backend);
113+
}
114+
}
115+
return ClusterStats.builder(backend.getName())
116+
.trinoStatus(TrinoStatus.HEALTHY)
117+
.runningQueryCount((int) Float.parseFloat(metrics.get(RUNNING_QUERIES_METRIC)))
118+
.queuedQueryCount((int) Float.parseFloat(metrics.get(QUEUED_QUERIES_METRIC)))
119+
.proxyTo(backend.getProxyTo())
120+
.externalUrl(backend.getExternalUrl())
121+
.routingGroup(backend.getRoutingGroup())
122+
.build();
123+
}
124+
125+
private Map<String, String> getMetrics(String baseUrl, int retriesRemaining)
126+
{
127+
HttpUriBuilder uri = uriBuilderFrom(URI.create(baseUrl)).appendPath(metricsEndpoint);
128+
for (String metric : metricNames) {
129+
uri.addParameter("name[]", metric);
130+
}
131+
132+
Request request = prepareGet()
133+
.setUri(uri.build())
134+
.addHeader(identityHeader.name, identityHeader.value)
135+
.addHeader("Content-Type", "application/openmetrics-text; version=1.0.0; charset=utf-8")
136+
.build();
137+
try {
138+
return client.execute(request, metricsResponseHandler);
139+
}
140+
catch (UnexpectedResponseException e) {
141+
if (shouldRetry(e.getStatusCode())) {
142+
if (retriesRemaining > 0) {
143+
log.warn("Retrying health check on error: %s, ", e.toString());
144+
return getMetrics(baseUrl, retriesRemaining - 1);
145+
}
146+
else {
147+
log.error("Encountered error %s, no retries remaining", e.toString());
148+
}
149+
}
150+
else {
151+
log.error(e, "Health check failed with non-retryable response. %s\n%s", e.getMessage(), e.toString());
152+
}
153+
}
154+
catch (Exception e) {
155+
log.error(e, "Exception checking %s for health", request.getUri());
156+
}
157+
return ImmutableMap.of();
158+
}
159+
160+
private static class MetricsResponseHandler
161+
implements ResponseHandler<Map<String, String>, RuntimeException>
162+
{
163+
private final ImmutableSet<String> requiredKeys;
164+
165+
public MetricsResponseHandler(ImmutableSet<String> requiredKeys)
166+
{
167+
this.requiredKeys = requireNonNull(requiredKeys);
168+
}
169+
170+
@Override
171+
public Map<String, String> handleException(Request request, Exception exception)
172+
throws RuntimeException
173+
{
174+
throw propagate(request, exception);
175+
}
176+
177+
@Override
178+
public Map<String, String> handle(Request request, Response response)
179+
throws RuntimeException
180+
{
181+
try {
182+
String responseBody = new String(response.getInputStream().readAllBytes(), UTF_8);
183+
Map<String, String> metrics = Arrays.stream(responseBody.split("\n"))
184+
.filter(s -> !s.startsWith("#"))
185+
.collect(toImmutableMap(s -> s.split(" ")[0], s -> s.split(" ")[1]));
186+
if (!metrics.keySet().containsAll(requiredKeys)) {
187+
throw new UnexpectedResponseException(
188+
String.format("Request is missing required keys: \n%s\nin response: '%s'", String.join("\n", requiredKeys), responseBody),
189+
request,
190+
response);
191+
}
192+
return metrics;
193+
}
194+
catch (IOException e) {
195+
throw new UnexpectedResponseException(request, response);
196+
}
197+
}
198+
}
199+
200+
private record Header(String name, String value) {}
201+
}

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMonitor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
1717

18+
import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
19+
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
20+
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
21+
1822
public interface ClusterStatsMonitor
1923
{
2024
ClusterStats monitor(ProxyBackendConfiguration backend);
@@ -27,4 +31,12 @@ static ClusterStats.Builder getClusterStatsBuilder(ProxyBackendConfiguration bac
2731
builder.routingGroup(backend.getRoutingGroup());
2832
return builder;
2933
}
34+
35+
static boolean shouldRetry(int statusCode)
36+
{
37+
return switch (statusCode) {
38+
case HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT -> true;
39+
default -> false;
40+
};
41+
}
3042
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
1818
NOOP,
1919
INFO_API,
2020
UI_API,
21-
JDBC
21+
JDBC,
22+
METRICS
2223
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,24 @@
1313
*/
1414
package io.trino.gateway.ha.config;
1515

16+
import com.google.common.collect.ImmutableMap;
1617
import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor;
1718

19+
import java.util.Map;
20+
1821
public class MonitorConfiguration
1922
{
2023
private int taskDelaySeconds = ActiveClusterMonitor.MONITOR_TASK_DELAY_SECONDS;
2124

2225
private int retries;
2326

27+
private String metricsEndpoint = "/metrics";
28+
29+
// Require 1 node for health by default. This configuration only applies to the ClusterStatsMetricsMonitor
30+
private Map<String, Float> metricMinimumValues = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 1f);
31+
32+
private Map<String, Float> metricMaximumValues = ImmutableMap.of();
33+
2434
public MonitorConfiguration() {}
2535

2636
public int getTaskDelaySeconds()
@@ -42,4 +52,34 @@ public void setRetries(int retries)
4252
{
4353
this.retries = retries;
4454
}
55+
56+
public String getMetricsEndpoint()
57+
{
58+
return metricsEndpoint;
59+
}
60+
61+
public void setMetricsEndpoint(String metricsEndpoint)
62+
{
63+
this.metricsEndpoint = metricsEndpoint;
64+
}
65+
66+
public Map<String, Float> getMetricMinimumValues()
67+
{
68+
return metricMinimumValues;
69+
}
70+
71+
public void setMetricMinimumValues(Map<String, Float> metricMinimumValues)
72+
{
73+
this.metricMinimumValues = metricMinimumValues;
74+
}
75+
76+
public Map<String, Float> getMetricMaximumValues()
77+
{
78+
return metricMaximumValues;
79+
}
80+
81+
public void setMetricMaximumValues(Map<String, Float> metricMaximumValues)
82+
{
83+
this.metricMaximumValues = metricMaximumValues;
84+
}
4585
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
2121
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
2222
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
23+
import io.trino.gateway.ha.clustermonitor.ClusterStatsMetricsMonitor;
2324
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
2425
import io.trino.gateway.ha.clustermonitor.ForMonitor;
2526
import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor;
@@ -51,6 +52,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
5152
case UI_API -> new ClusterStatsHttpMonitor(config.getBackendState());
5253
case JDBC -> new ClusterStatsJdbcMonitor(config.getBackendState(), config.getMonitor());
5354
case NOOP -> new NoopClusterStatsMonitor();
55+
case METRICS -> new ClusterStatsMetricsMonitor(httpClient, config.getBackendState(), config.getMonitor());
5456
};
5557
}
5658
}

gateway-ha/src/main/java/io/trino/gateway/ha/security/util/BasicCredentials.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ public static BasicCredentials extractBasicAuthCredentials(ContainerRequestConte
4444
return extractBasicAuthCredentials(header);
4545
}
4646

47+
public String getBasicAuthHeader()
48+
{
49+
return String.format("Basic %s", encodeCredentials());
50+
}
51+
4752
public static BasicCredentials extractBasicAuthCredentials(String header)
4853
throws AuthenticationException
4954
{
@@ -78,4 +83,9 @@ private static String decodeCredentials(String credentials)
7883
throw new AuthenticationException("Invalid base64 encoded credentials");
7984
}
8085
}
86+
87+
private String encodeCredentials()
88+
{
89+
return Base64.getEncoder().encodeToString(String.format("%s:%s", username, password).getBytes(ISO_8859_1));
90+
}
8191
}

0 commit comments

Comments
 (0)