diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 7e9c24172..8fb74ccd2 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -104,6 +104,12 @@ + + com.zaxxer + HikariCP + 7.0.2 + + io.airlift aircompressor-v3 diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java index cd04c5335..81164e99f 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java @@ -21,8 +21,9 @@ public class DataStoreConfiguration private String driver; private Integer queryHistoryHoursRetention = 4; private boolean runMigrationsEnabled = true; + private Integer maxPoolSize; - public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled) + public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled, Integer maxPoolSize) { this.jdbcUrl = jdbcUrl; this.user = user; @@ -30,6 +31,22 @@ public DataStoreConfiguration(String jdbcUrl, String user, String password, Stri this.driver = driver; this.queryHistoryHoursRetention = queryHistoryHoursRetention; this.runMigrationsEnabled = runMigrationsEnabled; + this.maxPoolSize = maxPoolSize; + } + + public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled) + { + this(jdbcUrl, user, password, driver, queryHistoryHoursRetention, runMigrationsEnabled, null); + } + + public Integer getMaxPoolSize() + { + return this.maxPoolSize; + } + + public void setMaxPoolSize(Integer maxPoolSize) + { + this.maxPoolSize = maxPoolSize; } public DataStoreConfiguration() {} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java index 2973655b8..e56dc81cb 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java @@ -90,6 +90,7 @@ public class HaGatewayProviderModule private final GatewayBackendManager gatewayBackendManager; private final QueryHistoryManager queryHistoryManager; private final PathFilter pathFilter; + private final JdbcConnectionManager connectionManager; @Override protected void configure() @@ -100,6 +101,7 @@ protected void configure() binder().bind(QueryHistoryManager.class).toInstance(queryHistoryManager); binder().bind(BackendStateManager.class).in(Scopes.SINGLETON); binder().bind(PathFilter.class).toInstance(pathFilter); + binder().bind(JdbcConnectionManager.class).toInstance(connectionManager); } public HaGatewayProviderModule(HaGatewayConfiguration configuration) @@ -121,8 +123,8 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration) oAuth2GatewayCookieConfigurationPropertiesProvider.initialize(configuration.getOauth2GatewayCookieConfiguration()); Jdbi jdbi = Jdbi.create(configuration.getDataStore().getJdbcUrl(), configuration.getDataStore().getUser(), configuration.getDataStore().getPassword()); - JdbcConnectionManager connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore()); - resourceGroupsManager = new HaResourceGroupsManager(connectionManager); + this.connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore()); + resourceGroupsManager = new HaResourceGroupsManager(this.connectionManager); gatewayBackendManager = new HaGatewayManager(jdbi, configuration.getRouting()); queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle")); } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java index 2506b3f2a..0f4ce9c95 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java @@ -14,16 +14,21 @@ package io.trino.gateway.ha.persistence; import com.google.common.annotations.VisibleForTesting; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import io.airlift.log.Logger; import io.trino.gateway.ha.config.DataStoreConfiguration; import io.trino.gateway.ha.persistence.dao.QueryHistoryDao; import jakarta.annotation.Nullable; +import jakarta.annotation.PreDestroy; import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.sqlobject.SqlObjectPlugin; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -39,6 +44,8 @@ public class JdbcConnectionManager private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final Map pools = new ConcurrentHashMap<>(); + public JdbcConnectionManager(Jdbi jdbi, DataStoreConfiguration configuration) { this.jdbi = requireNonNull(jdbi, "jdbi is null") @@ -59,7 +66,18 @@ public Jdbi getJdbi(@Nullable String routingGroupDatabase) return jdbi; } - return Jdbi.create(buildJdbcUrl(routingGroupDatabase), configuration.getUser(), configuration.getPassword()) + Integer maxPoolSize = configuration.getMaxPoolSize(); + if (maxPoolSize != null && maxPoolSize > 0) { + HikariDataSource ds = getOrCreateDataSource(routingGroupDatabase, maxPoolSize); + return Jdbi.create(ds) + .installPlugin(new SqlObjectPlugin()) + .registerRowMapper(new RecordAndAnnotatedConstructorMapper()); + } + + return Jdbi.create( + buildJdbcUrl(routingGroupDatabase), + configuration.getUser(), + configuration.getPassword()) .installPlugin(new SqlObjectPlugin()) .registerRowMapper(new RecordAndAnnotatedConstructorMapper()); } @@ -107,11 +125,52 @@ private void startCleanUps() executorService.scheduleWithFixedDelay( () -> { log.info("Performing query history cleanup task"); - long created = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention()); + long created = System.currentTimeMillis() + - TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention()); jdbi.onDemand(QueryHistoryDao.class).deleteOldHistory(created); }, 1, 120, TimeUnit.MINUTES); } + + private HikariDataSource getOrCreateDataSource(String routingGroupDatabase, int maxPoolSize) + { + return pools.compute(routingGroupDatabase, (key, existing) -> { + if (existing != null && !existing.isClosed()) { + return existing; + } + + HikariConfig cfg = new HikariConfig(); + cfg.setJdbcUrl(buildJdbcUrl(key)); + cfg.setUsername(configuration.getUser()); + cfg.setPassword(configuration.getPassword()); + if (configuration.getDriver() != null) { + cfg.setDriverClassName(configuration.getDriver()); + } + cfg.setMaximumPoolSize(maxPoolSize); + cfg.setPoolName("gateway-ha-" + key); + + return new HikariDataSource(cfg); + }); + } + + @PreDestroy + public void close() + { + for (Map.Entry e : pools.entrySet()) { + HikariDataSource ds = e.getValue(); + if (ds != null && !ds.isClosed()) { + try { + ds.close(); + } + catch (RuntimeException ex) { + log.warn(ex, "Failed to close datasource for key: %s", e.getKey()); + } + } + } + pools.clear(); + + executorService.shutdownNow(); + } } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/persistence/TestJdbcConnectionManagerPool.java b/gateway-ha/src/test/java/io/trino/gateway/ha/persistence/TestJdbcConnectionManagerPool.java new file mode 100644 index 000000000..4bc557eab --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/persistence/TestJdbcConnectionManagerPool.java @@ -0,0 +1,175 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.persistence; + +import io.trino.gateway.ha.config.DataStoreConfiguration; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; + +final class TestJdbcConnectionManagerPool +{ + @Test + void blocksWhenExceedingMaxPoolSize() + throws Exception + { + String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-pool-" + System.currentTimeMillis()).toString(); + String jdbcUrl = "jdbc:h2:" + dbPath; + + DataStoreConfiguration cfg = new DataStoreConfiguration( + jdbcUrl, "sa", "sa", "org.h2.Driver", + 4, true, + 2); + + JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg); + Jdbi jdbi = cm.getJdbi("testdb"); + + try (ExecutorService es = Executors.newFixedThreadPool(3)) { + List> acquired = new ArrayList<>(); + + CountDownLatch hold = new CountDownLatch(1); + CountDownLatch acquiredLatch = new CountDownLatch(2); + + // Open exactly maxPoolSize connections and keep them open + for (int i = 0; i < 2; i++) { + acquired.add(es.submit(() -> { + try (var h = jdbi.open()) { + acquiredLatch.countDown(); + boolean released = hold.await(10, TimeUnit.SECONDS); + assertThat(released).as("hold latch should be released by the test").isTrue(); + } + return null; + })); + } + + // Wait until both connections are actually acquired (avoid race) + boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS); + assertThat(bothAcquired).as("both connections should be acquired before third attempt").isTrue(); + + // Third attempt should block since the pool is full + Future third = es.submit(() -> { + var h = jdbi.open(); + h.close(); + return true; + }); + + boolean completedIn200ms = false; + try { + third.get(200, TimeUnit.MILLISECONDS); + completedIn200ms = true; // if this happens when connection was not blocked, which is wrong + } + catch (TimeoutException expected) { + // expected, means the request was blocked on the pool + } + + assertThat(completedIn200ms) + .as("third getJdbi().open() should be blocked by maxPoolSize=2") + .isFalse(); + + // Release the first two connections, the third one should complete now + hold.countDown(); + assertThat(third.get(3, TimeUnit.SECONDS)).isTrue(); + + // Wait for the first two to finish gracefully + for (Future f : acquired) { + f.get(3, TimeUnit.SECONDS); + } + } + } + + @Test + void doesNotBlockWhenMaxPoolSizeIsNull() + throws Exception + { + String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-nopool-" + System.currentTimeMillis()).toString(); + String jdbcUrl = "jdbc:h2:" + dbPath; + + // maxPoolSize == null -> no pool path + DataStoreConfiguration cfg = new DataStoreConfiguration( + jdbcUrl, "sa", "sa", "org.h2.Driver", + 4, true); + + JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg); + Jdbi jdbi = cm.getJdbi("testdb"); + + try (ExecutorService es = Executors.newFixedThreadPool(3)) { + try { + CountDownLatch hold = new CountDownLatch(1); + CountDownLatch acquiredLatch = new CountDownLatch(2); + + // Open two connections and keep them open + for (int i = 0; i < 2; i++) { + es.submit(() -> { + try (var h = jdbi.open()) { + acquiredLatch.countDown(); + boolean released = hold.await(10, TimeUnit.SECONDS); + assertThat(released).isTrue(); + } + return null; + }); + } + + // Wait until both connections are really open (avoid race conditions) + boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS); + assertThat(bothAcquired).isTrue(); + + // Third connection attempt should NOT block since no pool is used + Future third = es.submit(() -> { + var h = jdbi.open(); + h.close(); + return true; + }); + + boolean completedIn200ms; + try { + third.get(200, TimeUnit.MILLISECONDS); + completedIn200ms = true; // not blocked - expected behavior + } + catch (TimeoutException ignore) { + completedIn200ms = false; // blocked - incorrect for no-pool case + } + + assertThat(completedIn200ms) + .as("third getJdbi().open() should NOT block when no pool is configured") + .isTrue(); + + // check H2 session count to confirm multiple physical connections were opened + int sessions = jdbi.withHandle(h -> + h.createQuery("SELECT COUNT(*) FROM INFORMATION_SCHEMA.SESSIONS") + .mapTo(int.class) + .one()); + assertThat(sessions).isGreaterThanOrEqualTo(3); + + // Release the first two connections + hold.countDown(); + assertThat(third.get(3, TimeUnit.SECONDS)).isTrue(); + } + finally { + es.shutdownNow(); + } + } + } +}