Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.http.client.HttpClient;
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
Expand Down Expand Up @@ -82,7 +83,6 @@ public class HaGatewayProviderModule
private final LbOAuthManager oauthManager;
private final LbFormAuthManager formAuthManager;
private final AuthorizationManager authorizationManager;
private final BackendStateManager backendStateConnectionManager;
private final ResourceSecurityDynamicFeature resourceSecurityDynamicFeature;
private final HaGatewayConfiguration configuration;
private final ResourceGroupsManager resourceGroupsManager;
Expand All @@ -96,6 +96,7 @@ protected void configure()
binder().bind(ResourceGroupsManager.class).toInstance(resourceGroupsManager);
binder().bind(GatewayBackendManager.class).toInstance(gatewayBackendManager);
binder().bind(QueryHistoryManager.class).toInstance(queryHistoryManager);
binder().bind(BackendStateManager.class).in(Scopes.SINGLETON);
}

public HaGatewayProviderModule(HaGatewayConfiguration configuration)
Expand All @@ -108,7 +109,6 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration)

authorizationManager = new AuthorizationManager(configuration.getAuthorization(), presetUsers);
resourceSecurityDynamicFeature = getAuthFilter(configuration);
backendStateConnectionManager = new BackendStateManager();

GatewayCookieConfigurationPropertiesProvider gatewayCookieConfigurationPropertiesProvider = GatewayCookieConfigurationPropertiesProvider.getInstance();
gatewayCookieConfigurationPropertiesProvider.initialize(configuration.getGatewayCookieConfiguration());
Expand Down Expand Up @@ -206,13 +206,6 @@ public AuthorizationManager getAuthorizationManager()
return this.authorizationManager;
}

@Provides
@Singleton
public BackendStateManager getBackendStateConnectionManager()
{
return this.backendStateConnectionManager;
}

@Provides
@Singleton
public RoutingGroupSelector getRoutingGroupSelector(@ForRouter HttpClient httpClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,29 @@
*/
package io.trino.gateway.ha.router;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed;

import java.util.HashMap;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public class BackendStateManager
{
private final Map<String, ClusterStats> clusterStats;
private final MBeanExporter exporter;
private final Map<String, ClusterStats> clusterStats = new HashMap<>();
private final Map<String, ClusterStatsJMX> clusterStatsJMXs = new HashMap<>();

public BackendStateManager()
@Inject
public BackendStateManager(MBeanExporter exporter)
{
this.clusterStats = new HashMap<>();
this.exporter = requireNonNull(exporter, "exporter is null");
}

public ClusterStats getBackendState(ProxyBackendConfiguration backend)
Expand All @@ -36,6 +46,83 @@ public ClusterStats getBackendState(ProxyBackendConfiguration backend)

public void updateStates(String clusterId, ClusterStats stats)
{
if (!clusterStatsJMXs.containsKey(clusterId)) {
ClusterStatsJMX clusterStatsJMX = new ClusterStatsJMX(stats);
exporter.exportWithGeneratedName(
clusterStatsJMX,
ClusterStatsJMX.class,
ImmutableMap.<String, String>builder()
.put("name", "ClusterStats")
.put("cluster_id", clusterId)
.build());
clusterStatsJMXs.put(clusterId, clusterStatsJMX);
}
else {
clusterStatsJMXs.get(clusterId).updateFrom(stats);
}
clusterStats.put(clusterId, stats);
}

public static class ClusterStatsJMX
{
private int runningQueryCount;
private int queuedQueryCount;
private int numWorkerNodes;
private TrinoStatus trinoStatus;

public ClusterStatsJMX(ClusterStats clusterStats)
{
updateFrom(clusterStats);
}

public void updateFrom(ClusterStats clusterStats)
{
runningQueryCount = clusterStats.runningQueryCount();
queuedQueryCount = clusterStats.queuedQueryCount();
numWorkerNodes = clusterStats.numWorkerNodes();
trinoStatus = clusterStats.trinoStatus();
}

@Managed
public int getRunningQueryCount()
{
return runningQueryCount;
}

@Managed
public int getQueuedQueryCount()
{
return queuedQueryCount;
}

@Managed
public int getNumWorkerNodes()
{
return numWorkerNodes;
}

@Managed
public int getTrinoStatusPending()
{
return trinoStatus == TrinoStatus.PENDING ? 1 : 0;
}

@Managed
public int getTrinoStatusHealthy()
{
return trinoStatus == TrinoStatus.HEALTHY ? 1 : 0;
}

@Managed
public int getTrinoStatusUnhealthy()
{
return trinoStatus == TrinoStatus.UNHEALTHY ? 1 : 0;
}

@Managed
public int getTrinoStatusUnknown()
{
return trinoStatus == TrinoStatus.UNKNOWN ? 1 : 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,20 @@ void testHealthCheckEndpoints()
throw new IllegalStateException("Trino Gateway health check failed");
}

@Test
void testClusterStatsJMX()
throws Exception
{
Request request = new Request.Builder()
.url("http://localhost:" + routerPort + "/metrics")
.get()
.build();
Response response = httpClient.newCall(request).execute();
String body = response.body().string();
assertThat(body).contains("trino1_TrinoStatusHealthy");
assertThat(body).contains("trino2_TrinoStatusHealthy");
}

@AfterAll
void cleanup()
{
Expand Down