Skip to content

Commit 420f6e0

Browse files
amybubuebyhr
authored andcommitted
Add cluster activation status metric and emit to v1/jmx
1 parent b662308 commit 420f6e0

File tree

6 files changed

+315
-0
lines changed

6 files changed

+315
-0
lines changed

gateway-ha/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ clusterStatsConfiguration:
1919
# This can be adjusted based on the coordinator state
2020
monitor:
2121
taskDelay: 1m
22+
clusterMetricsRegistryRefreshPeriod: 30s

gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.inject.Scopes;
2020
import io.airlift.log.Logger;
2121
import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor;
22+
import io.trino.gateway.ha.clustermonitor.ClusterMetricsStatsExporter;
2223
import io.trino.gateway.ha.clustermonitor.ForMonitor;
2324
import io.trino.gateway.ha.config.HaGatewayConfiguration;
2425
import io.trino.gateway.ha.handler.ProxyHandlerStats;
@@ -146,6 +147,7 @@ public void configure(Binder binder)
146147
binder.bind(ProxyHandlerStats.class).in(Scopes.SINGLETON);
147148
newExporter(binder).export(ProxyHandlerStats.class).withGeneratedName();
148149
binder.bind(RoutingRulesManager.class);
150+
binder.bind(ClusterMetricsStatsExporter.class).in(Scopes.SINGLETON);
149151
}
150152

151153
private static void addManagedApps(HaGatewayConfiguration configuration, Binder binder)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import io.trino.gateway.ha.router.GatewayBackendManager;
17+
import org.weakref.jmx.Managed;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
public class ClusterMetricsStats
22+
{
23+
private final String clusterName;
24+
private final GatewayBackendManager gatewayBackendManager;
25+
26+
public ClusterMetricsStats(String clusterName, GatewayBackendManager gatewayBackendManager)
27+
{
28+
this.clusterName = requireNonNull(clusterName, "clusterName is null");
29+
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
30+
}
31+
32+
public String getClusterName()
33+
{
34+
return clusterName;
35+
}
36+
37+
@Managed
38+
public int getActivationStatus()
39+
{
40+
return gatewayBackendManager.getBackendByName(clusterName)
41+
.map(cluster -> cluster.isActive() ? 1 : 0)
42+
.orElse(-1);
43+
}
44+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.inject.Inject;
18+
import com.google.inject.Singleton;
19+
import io.airlift.log.Logger;
20+
import io.airlift.units.Duration;
21+
import io.trino.gateway.ha.config.MonitorConfiguration;
22+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
23+
import io.trino.gateway.ha.router.GatewayBackendManager;
24+
import jakarta.annotation.PostConstruct;
25+
import jakarta.annotation.PreDestroy;
26+
import org.weakref.jmx.MBeanExporter;
27+
28+
import java.util.HashMap;
29+
import java.util.HashSet;
30+
import java.util.Map;
31+
import java.util.Set;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.stream.Collectors;
35+
36+
import static java.util.Objects.requireNonNull;
37+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
38+
39+
@Singleton
40+
public class ClusterMetricsStatsExporter
41+
implements AutoCloseable
42+
{
43+
private static final Logger log = Logger.get(ClusterMetricsStatsExporter.class);
44+
45+
private final MBeanExporter exporter;
46+
private final GatewayBackendManager gatewayBackendManager;
47+
private final Duration refreshInterval;
48+
// MBeanExporter uses weak references, so clustersStats Map is needed to maintain strong references to metric objects to prevent garbage collection
49+
private final Map<String, ClusterMetricsStats> clustersStats = new HashMap<>();
50+
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
51+
52+
@Inject
53+
public ClusterMetricsStatsExporter(GatewayBackendManager gatewayBackendManager, MBeanExporter exporter, MonitorConfiguration monitorConfiguration)
54+
{
55+
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
56+
this.exporter = requireNonNull(exporter, "exporter is null");
57+
this.refreshInterval = monitorConfiguration.getClusterMetricsRegistryRefreshPeriod();
58+
}
59+
60+
@PostConstruct
61+
public void start()
62+
{
63+
log.debug("Running periodic metric refresh with interval of %s", refreshInterval);
64+
scheduledExecutor.scheduleAtFixedRate(() -> {
65+
try {
66+
updateClustersMetricRegistry();
67+
}
68+
catch (Exception e) {
69+
log.error(e, "Error refreshing cluster metrics");
70+
}
71+
}, 0, refreshInterval.toMillis(), MILLISECONDS);
72+
}
73+
74+
@PreDestroy
75+
public void stop()
76+
{
77+
scheduledExecutor.shutdownNow();
78+
}
79+
80+
private synchronized void updateClustersMetricRegistry()
81+
{
82+
// Get current clusters from DB
83+
Set<String> currentClusters = gatewayBackendManager.getAllBackends().stream()
84+
.map(ProxyBackendConfiguration::getName)
85+
.collect(Collectors.toSet());
86+
87+
// Create a copy of keys to avoid concurrent modification
88+
Set<String> registeredClusters = new HashSet<>(clustersStats.keySet());
89+
90+
// Unregister metrics for removed clusters
91+
for (String registeredCluster : registeredClusters) {
92+
if (!currentClusters.contains(registeredCluster)) {
93+
try {
94+
exporter.unexportWithGeneratedName(ClusterMetricsStats.class, registeredCluster);
95+
log.debug("Unregistered metrics for removed cluster: %s", registeredCluster);
96+
clustersStats.remove(registeredCluster);
97+
}
98+
catch (Exception e) {
99+
log.error(e, "Failed to unregister metrics for cluster: %s", registeredCluster);
100+
}
101+
}
102+
}
103+
104+
// Register metrics for added clusters
105+
for (String cluster : currentClusters) {
106+
if (!clustersStats.containsKey(cluster)) {
107+
registerClusterMetrics(cluster);
108+
}
109+
}
110+
}
111+
112+
private synchronized void registerClusterMetrics(String clusterName)
113+
{
114+
ClusterMetricsStats stats = new ClusterMetricsStats(clusterName, gatewayBackendManager);
115+
116+
if (clustersStats.putIfAbsent(clusterName, stats) == null) { // null means the stats didn't exist previously and was inserted
117+
try {
118+
exporter.exportWithGeneratedName(stats, ClusterMetricsStats.class, clusterName);
119+
log.debug("Registered metrics for cluster: %s", clusterName);
120+
}
121+
catch (Exception e) {
122+
clustersStats.remove(clusterName);
123+
log.error(e, "Failed to register metrics for cluster: %s", clusterName);
124+
}
125+
}
126+
else {
127+
log.warn("Attempted to register metrics for duplicate cluster name: %s. This may cause JMX registration issues.", clusterName);
128+
}
129+
}
130+
131+
@VisibleForTesting
132+
GatewayBackendManager gatewayBackendManager()
133+
{
134+
return gatewayBackendManager;
135+
}
136+
137+
@VisibleForTesting
138+
MBeanExporter exporter()
139+
{
140+
return exporter;
141+
}
142+
143+
@Override
144+
public void close()
145+
{
146+
stop();
147+
}
148+
}

gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class MonitorConfiguration
2929

3030
private Duration queryTimeout = new Duration(10, SECONDS);
3131

32+
private Duration clusterMetricsRegistryRefreshPeriod = new Duration(30, SECONDS);
33+
3234
private boolean explicitPrepare;
3335

3436
private String metricsEndpoint = "/metrics";
@@ -133,4 +135,14 @@ public void setMetricMaximumValues(Map<String, Float> metricMaximumValues)
133135
{
134136
this.metricMaximumValues = metricMaximumValues;
135137
}
138+
139+
public Duration getClusterMetricsRegistryRefreshPeriod()
140+
{
141+
return clusterMetricsRegistryRefreshPeriod;
142+
}
143+
144+
public void setClusterMetricsRegistryRefreshPeriod(Duration clusterMetricsRegistryRefreshPeriod)
145+
{
146+
this.clusterMetricsRegistryRefreshPeriod = clusterMetricsRegistryRefreshPeriod;
147+
}
136148
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.clustermonitor;
15+
16+
import io.airlift.units.Duration;
17+
import io.trino.gateway.ha.config.MonitorConfiguration;
18+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
19+
import io.trino.gateway.ha.router.GatewayBackendManager;
20+
import org.junit.jupiter.api.Test;
21+
import org.weakref.jmx.MBeanExporter;
22+
23+
import java.util.List;
24+
25+
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
26+
import static java.util.concurrent.TimeUnit.SECONDS;
27+
import static org.mockito.ArgumentMatchers.argThat;
28+
import static org.mockito.ArgumentMatchers.eq;
29+
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.verify;
31+
import static org.mockito.Mockito.when;
32+
33+
final class TestClusterMetricsStatsExporter
34+
{
35+
@Test
36+
void testMetricsRegistrationForNewCluster()
37+
{
38+
try (ClusterMetricsStatsExporter statsExporter = createStatsExporter()) {
39+
String clusterName1 = "test-cluster1";
40+
ProxyBackendConfiguration cluster1 = createTestCluster(clusterName1);
41+
String clusterName2 = "test-cluster2";
42+
ProxyBackendConfiguration cluster2 = createTestCluster(clusterName2);
43+
when(statsExporter.gatewayBackendManager().getAllBackends())
44+
.thenReturn(List.of(cluster1)) // First return with 1 cluster
45+
.thenReturn(List.of(cluster1, cluster2)); // Then return with 2 clusters to simulate addition
46+
47+
statsExporter.start();
48+
sleepUninterruptibly(2, SECONDS);
49+
50+
verify(statsExporter.exporter()).exportWithGeneratedName(
51+
argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName1)),
52+
eq(ClusterMetricsStats.class), eq(clusterName1));
53+
54+
// Wait for next update where cluster is added
55+
sleepUninterruptibly(2, SECONDS);
56+
57+
verify(statsExporter.exporter()).exportWithGeneratedName(
58+
argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName2)),
59+
eq(ClusterMetricsStats.class), eq(clusterName2));
60+
}
61+
}
62+
63+
@Test
64+
public void testMetricsUnregistrationForRemovedCluster()
65+
{
66+
try (ClusterMetricsStatsExporter statsExporter = createStatsExporter()) {
67+
String clusterName = "test-cluster";
68+
ProxyBackendConfiguration cluster = createTestCluster(clusterName);
69+
when(statsExporter.gatewayBackendManager().getAllBackends())
70+
.thenReturn(List.of(cluster)) // First return with cluster
71+
.thenReturn(List.of()); // Then return empty list to simulate removal
72+
73+
statsExporter.start();
74+
sleepUninterruptibly(2, SECONDS);
75+
76+
verify(statsExporter.exporter()).exportWithGeneratedName(
77+
argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName)),
78+
eq(ClusterMetricsStats.class), eq(clusterName));
79+
80+
// Wait for next update where cluster is removed
81+
sleepUninterruptibly(2, SECONDS);
82+
83+
verify(statsExporter.exporter()).unexportWithGeneratedName(eq(ClusterMetricsStats.class), eq(clusterName));
84+
}
85+
}
86+
87+
private static ProxyBackendConfiguration createTestCluster(String name)
88+
{
89+
ProxyBackendConfiguration cluster = new ProxyBackendConfiguration();
90+
cluster.setName(name);
91+
return cluster;
92+
}
93+
94+
private static ClusterMetricsStatsExporter createStatsExporter()
95+
{
96+
GatewayBackendManager gatewayBackendManager = mock(GatewayBackendManager.class);
97+
MBeanExporter exporter = mock(MBeanExporter.class);
98+
MonitorConfiguration monitorConfiguration = mock(MonitorConfiguration.class);
99+
100+
when(monitorConfiguration.getClusterMetricsRegistryRefreshPeriod())
101+
.thenReturn(new Duration(1, SECONDS));
102+
103+
return new ClusterMetricsStatsExporter(
104+
gatewayBackendManager,
105+
exporter,
106+
monitorConfiguration);
107+
}
108+
}

0 commit comments

Comments
 (0)