2626import java .util .concurrent .ExecutorService ;
2727import java .util .concurrent .Executors ;
2828import java .util .concurrent .Future ;
29+ import java .util .concurrent .ScheduledExecutorService ;
2930import java .util .concurrent .TimeUnit ;
3031
32+ import static java .util .Objects .requireNonNull ;
33+
3134public class ActiveClusterMonitor
3235{
3336 public static final int MONITOR_TASK_DELAY_SECONDS = 60 ;
@@ -39,9 +42,8 @@ public class ActiveClusterMonitor
3942
4043 private final int taskDelaySeconds ;
4144 private final ClusterStatsMonitor clusterStatsMonitor ;
42- private volatile boolean monitorActive = true ;
4345 private final ExecutorService executorService = Executors .newFixedThreadPool (DEFAULT_THREAD_POOL_SIZE );
44- private final ExecutorService singleTaskExecutor = Executors .newSingleThreadExecutor ();
46+ private final ScheduledExecutorService scheduledExecutor = Executors .newSingleThreadScheduledExecutor ();
4547
4648 @ Inject
4749 public ActiveClusterMonitor (
@@ -50,65 +52,54 @@ public ActiveClusterMonitor(
5052 MonitorConfiguration monitorConfiguration ,
5153 ClusterStatsMonitor clusterStatsMonitor )
5254 {
53- this .clusterStatsObservers = clusterStatsObservers ;
54- this .gatewayBackendManager = gatewayBackendManager ;
55+ this .clusterStatsMonitor = requireNonNull (clusterStatsMonitor , "clusterStatsMonitor is null" );
56+ this .clusterStatsObservers = requireNonNull (clusterStatsObservers , "clusterStatsObservers is null" );
57+ this .gatewayBackendManager = requireNonNull (gatewayBackendManager , "gatewayBackendManager is null" );
5558 this .taskDelaySeconds = monitorConfiguration .getTaskDelaySeconds ();
56- this .clusterStatsMonitor = clusterStatsMonitor ;
57- log .info ("Running cluster monitor with connection task delay of %d seconds" , taskDelaySeconds );
5859 }
5960
60- /**
61- * Run an app that queries all active trino clusters for stats.
62- */
6361 @ PostConstruct
6462 public void start ()
6563 {
66- singleTaskExecutor .submit (
67- () -> {
68- while (monitorActive ) {
69- try {
70- log .info ("Getting the stats for the active clusters" );
71- List <ProxyBackendConfiguration > activeClusters =
72- gatewayBackendManager .getAllActiveBackends ();
73- List <Future <ClusterStats >> futures = new ArrayList <>();
74- for (ProxyBackendConfiguration backend : activeClusters ) {
75- Future <ClusterStats > call =
76- executorService .submit (() -> clusterStatsMonitor .monitor (backend ));
77- futures .add (call );
78- }
79- List <ClusterStats > stats = new ArrayList <>();
80- for (Future <ClusterStats > clusterStatsFuture : futures ) {
81- ClusterStats clusterStats = clusterStatsFuture .get ();
82- stats .add (clusterStats );
83- }
64+ log .info ("Running cluster monitor with connection task delay of %d seconds" , taskDelaySeconds );
65+ scheduledExecutor .scheduleAtFixedRate (() -> {
66+ try {
67+ log .info ("Getting stats for all active clusters" );
68+ List <ProxyBackendConfiguration > activeClusters =
69+ gatewayBackendManager .getAllActiveBackends ();
70+ List <Future <ClusterStats >> futures = new ArrayList <>();
71+ for (ProxyBackendConfiguration backend : activeClusters ) {
72+ Future <ClusterStats > call = executorService .submit (() -> clusterStatsMonitor .monitor (backend ));
73+ futures .add (call );
74+ }
75+ List <ClusterStats > stats = new ArrayList <>();
76+ for (Future <ClusterStats > clusterStatsFuture : futures ) {
77+ ClusterStats clusterStats = clusterStatsFuture .get ();
78+ stats .add (clusterStats );
79+ }
8480
85- if (clusterStatsObservers != null ) {
86- for (TrinoClusterStatsObserver observer : clusterStatsObservers ) {
87- observer .observe (stats );
88- }
89- }
90- }
91- catch (Exception e ) {
92- log .error (e , "Error performing backend monitor tasks" );
93- }
94- try {
95- Thread .sleep (TimeUnit .SECONDS .toMillis (taskDelaySeconds ));
96- }
97- catch (Exception e ) {
98- log .error (e , "Error with monitor task" );
99- }
81+ if (clusterStatsObservers != null ) {
82+ for (TrinoClusterStatsObserver observer : clusterStatsObservers ) {
83+ observer .observe (stats );
10084 }
101- });
85+ }
86+ }
87+ catch (Exception e ) {
88+ log .error (e , "Error performing backend monitor tasks" );
89+ }
90+ try {
91+ Thread .sleep (TimeUnit .SECONDS .toMillis (taskDelaySeconds ));
92+ }
93+ catch (Exception e ) {
94+ log .error (e , "Error with monitor task" );
95+ }
96+ }, 0 , taskDelaySeconds , TimeUnit .SECONDS );
10297 }
10398
104- /**
105- * Shut down the app.
106- */
10799 @ PreDestroy
108100 public void stop ()
109101 {
110- this .monitorActive = false ;
111- this .executorService .shutdown ();
112- this .singleTaskExecutor .shutdown ();
102+ executorService .shutdownNow ();
103+ scheduledExecutor .shutdownNow ();
113104 }
114105}
0 commit comments