Skip to content

Commit 1e17148

Browse files
committed
Refactor ActiveClusterMonitor
1 parent abe5c45 commit 1e17148

File tree

1 file changed

+40
-49
lines changed

1 file changed

+40
-49
lines changed

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.Future;
29+
import java.util.concurrent.ScheduledExecutorService;
2930
import java.util.concurrent.TimeUnit;
3031

32+
import static java.util.Objects.requireNonNull;
33+
3134
public class ActiveClusterMonitor
3235
{
3336
public static final int MONITOR_TASK_DELAY_SECONDS = 60;
@@ -39,9 +42,8 @@ public class ActiveClusterMonitor
3942

4043
private final int taskDelaySeconds;
4144
private final ClusterStatsMonitor clusterStatsMonitor;
42-
private volatile boolean monitorActive = true;
4345
private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
44-
private final ExecutorService singleTaskExecutor = Executors.newSingleThreadExecutor();
46+
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
4547

4648
@Inject
4749
public ActiveClusterMonitor(
@@ -50,65 +52,54 @@ public ActiveClusterMonitor(
5052
MonitorConfiguration monitorConfiguration,
5153
ClusterStatsMonitor clusterStatsMonitor)
5254
{
53-
this.clusterStatsObservers = clusterStatsObservers;
54-
this.gatewayBackendManager = gatewayBackendManager;
55+
this.clusterStatsMonitor = requireNonNull(clusterStatsMonitor, "clusterStatsMonitor is null");
56+
this.clusterStatsObservers = requireNonNull(clusterStatsObservers, "clusterStatsObservers is null");
57+
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
5558
this.taskDelaySeconds = monitorConfiguration.getTaskDelaySeconds();
56-
this.clusterStatsMonitor = clusterStatsMonitor;
57-
log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds);
5859
}
5960

60-
/**
61-
* Run an app that queries all active trino clusters for stats.
62-
*/
6361
@PostConstruct
6462
public void start()
6563
{
66-
singleTaskExecutor.submit(
67-
() -> {
68-
while (monitorActive) {
69-
try {
70-
log.info("Getting the stats for the active clusters");
71-
List<ProxyBackendConfiguration> activeClusters =
72-
gatewayBackendManager.getAllActiveBackends();
73-
List<Future<ClusterStats>> futures = new ArrayList<>();
74-
for (ProxyBackendConfiguration backend : activeClusters) {
75-
Future<ClusterStats> call =
76-
executorService.submit(() -> clusterStatsMonitor.monitor(backend));
77-
futures.add(call);
78-
}
79-
List<ClusterStats> stats = new ArrayList<>();
80-
for (Future<ClusterStats> clusterStatsFuture : futures) {
81-
ClusterStats clusterStats = clusterStatsFuture.get();
82-
stats.add(clusterStats);
83-
}
64+
log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds);
65+
scheduledExecutor.scheduleAtFixedRate(() -> {
66+
try {
67+
log.info("Getting stats for all active clusters");
68+
List<ProxyBackendConfiguration> activeClusters =
69+
gatewayBackendManager.getAllActiveBackends();
70+
List<Future<ClusterStats>> futures = new ArrayList<>();
71+
for (ProxyBackendConfiguration backend : activeClusters) {
72+
Future<ClusterStats> call = executorService.submit(() -> clusterStatsMonitor.monitor(backend));
73+
futures.add(call);
74+
}
75+
List<ClusterStats> stats = new ArrayList<>();
76+
for (Future<ClusterStats> clusterStatsFuture : futures) {
77+
ClusterStats clusterStats = clusterStatsFuture.get();
78+
stats.add(clusterStats);
79+
}
8480

85-
if (clusterStatsObservers != null) {
86-
for (TrinoClusterStatsObserver observer : clusterStatsObservers) {
87-
observer.observe(stats);
88-
}
89-
}
90-
}
91-
catch (Exception e) {
92-
log.error(e, "Error performing backend monitor tasks");
93-
}
94-
try {
95-
Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds));
96-
}
97-
catch (Exception e) {
98-
log.error(e, "Error with monitor task");
99-
}
81+
if (clusterStatsObservers != null) {
82+
for (TrinoClusterStatsObserver observer : clusterStatsObservers) {
83+
observer.observe(stats);
10084
}
101-
});
85+
}
86+
}
87+
catch (Exception e) {
88+
log.error(e, "Error performing backend monitor tasks");
89+
}
90+
try {
91+
Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds));
92+
}
93+
catch (Exception e) {
94+
log.error(e, "Error with monitor task");
95+
}
96+
}, 0, taskDelaySeconds, TimeUnit.SECONDS);
10297
}
10398

104-
/**
105-
* Shut down the app.
106-
*/
10799
@PreDestroy
108100
public void stop()
109101
{
110-
this.monitorActive = false;
111-
this.executorService.shutdown();
112-
this.singleTaskExecutor.shutdown();
102+
executorService.shutdownNow();
103+
scheduledExecutor.shutdownNow();
113104
}
114105
}

0 commit comments

Comments
 (0)