Skip to content

Commit ab4bcb6

Browse files
committed
add optional db connection pool support
1 parent 5a36940 commit ab4bcb6

File tree

4 files changed

+188
-2
lines changed

4 files changed

+188
-2
lines changed

gateway-ha/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,12 @@
318318
<scope>runtime</scope>
319319
</dependency>
320320

321+
<dependency>
322+
<groupId>com.zaxxer</groupId>
323+
<artifactId>HikariCP</artifactId>
324+
<version>7.0.2</version>
325+
</dependency>
326+
321327
<!-- Test deps -->
322328
<dependency>
323329
<groupId>com.h2database</groupId>

gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,28 @@ public class DataStoreConfiguration
2121
private String driver;
2222
private Integer queryHistoryHoursRetention = 4;
2323
private boolean runMigrationsEnabled = true;
24+
private Integer maxPoolSize;
2425

25-
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled)
26-
{
26+
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled, Integer maxPoolSize) {
2727
this.jdbcUrl = jdbcUrl;
2828
this.user = user;
2929
this.password = password;
3030
this.driver = driver;
3131
this.queryHistoryHoursRetention = queryHistoryHoursRetention;
3232
this.runMigrationsEnabled = runMigrationsEnabled;
33+
this.maxPoolSize = maxPoolSize;
34+
}
35+
36+
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled) {
37+
this(jdbcUrl, user, password, driver, queryHistoryHoursRetention, runMigrationsEnabled, null);
38+
}
39+
40+
public Integer getMaxPoolSize() {
41+
return this.maxPoolSize;
42+
}
43+
44+
public void setMaxPoolSize(Integer maxPoolSize) {
45+
this.maxPoolSize = maxPoolSize;
3346
}
3447

3548
public DataStoreConfiguration() {}

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
package io.trino.gateway.ha.persistence;
1515

1616
import com.google.common.annotations.VisibleForTesting;
17+
import com.zaxxer.hikari.HikariConfig;
18+
import com.zaxxer.hikari.HikariDataSource;
1719
import io.airlift.log.Logger;
1820
import io.trino.gateway.ha.config.DataStoreConfiguration;
1921
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
@@ -59,6 +61,20 @@ public Jdbi getJdbi(@Nullable String routingGroupDatabase)
5961
return jdbi;
6062
}
6163

64+
if (configuration.getMaxPoolSize() != null && configuration.getMaxPoolSize() > 0) {
65+
HikariConfig hikariConfig = new HikariConfig();
66+
hikariConfig.setJdbcUrl(buildJdbcUrl(routingGroupDatabase));
67+
hikariConfig.setUsername(configuration.getUser());
68+
hikariConfig.setPassword(configuration.getPassword());
69+
if (configuration.getDriver() != null) {
70+
hikariConfig.setDriverClassName(configuration.getDriver());
71+
}
72+
hikariConfig.setMaximumPoolSize(configuration.getMaxPoolSize());
73+
return Jdbi.create(new HikariDataSource(hikariConfig))
74+
.installPlugin(new SqlObjectPlugin())
75+
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
76+
}
77+
6278
return Jdbi.create(buildJdbcUrl(routingGroupDatabase), configuration.getUser(), configuration.getPassword())
6379
.installPlugin(new SqlObjectPlugin())
6480
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package io.trino.gateway.ha.persistence;
2+
3+
import io.trino.gateway.ha.config.DataStoreConfiguration;
4+
import org.jdbi.v3.core.Jdbi;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.nio.file.Path;
8+
import java.sql.Connection;
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
import java.util.concurrent.*;
12+
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
15+
final class TestJdbcConnectionManagerPool {
16+
@Test
17+
void blocksWhenExceedingMaxPoolSize() throws Exception {
18+
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-pool-" + System.currentTimeMillis()).toString();
19+
String jdbcUrl = "jdbc:h2:" + dbPath;
20+
21+
DataStoreConfiguration cfg = new DataStoreConfiguration(
22+
jdbcUrl, "sa", "sa", "org.h2.Driver",
23+
4, true,
24+
2
25+
);
26+
27+
JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
28+
Jdbi jdbi = cm.getJdbi("testdb");
29+
30+
try (ExecutorService es = Executors.newFixedThreadPool(3)) {
31+
List<Future<Connection>> acquired = new ArrayList<>();
32+
33+
CountDownLatch hold = new CountDownLatch(1);
34+
CountDownLatch acquiredLatch = new CountDownLatch(2);
35+
36+
// Open exactly maxPoolSize connections and keep them open
37+
for (int i = 0; i < 2; i++) {
38+
acquired.add(es.submit(() -> {
39+
try (var h = jdbi.open()) {
40+
acquiredLatch.countDown();
41+
boolean released = hold.await(10, TimeUnit.SECONDS);
42+
assertThat(released).as("hold latch should be released by the test").isTrue();
43+
}
44+
return null;
45+
}));
46+
}
47+
48+
// Wait until both connections are actually acquired (avoid race)
49+
boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS);
50+
assertThat(bothAcquired).as("both connections should be acquired before third attempt").isTrue();
51+
52+
// Third attempt should block since the pool is full
53+
Future<Boolean> third = es.submit(() -> {
54+
var h = jdbi.open();
55+
h.close();
56+
return true;
57+
});
58+
59+
boolean completedIn200ms = false;
60+
try {
61+
third.get(200, TimeUnit.MILLISECONDS);
62+
completedIn200ms = true; // if this happens when connection was not blocked, which is wrong
63+
} catch (TimeoutException expected) {
64+
// expected, means the request was blocked on the pool
65+
}
66+
67+
assertThat(completedIn200ms)
68+
.as("third getJdbi().open() should be blocked by maxPoolSize=2")
69+
.isFalse();
70+
71+
// Release the first two connections, the third one should complete now
72+
hold.countDown();
73+
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();
74+
75+
// Wait for the first two to finish gracefully
76+
for (Future<Connection> f : acquired) {
77+
f.get(3, TimeUnit.SECONDS);
78+
}
79+
}
80+
}
81+
82+
83+
@Test
84+
void doesNotBlockWhenMaxPoolSizeIsNull() throws Exception {
85+
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-nopool-" + System.currentTimeMillis()).toString();
86+
String jdbcUrl = "jdbc:h2:" + dbPath;
87+
88+
// maxPoolSize == null -> no pool path
89+
DataStoreConfiguration cfg = new DataStoreConfiguration(
90+
jdbcUrl, "sa", "sa", "org.h2.Driver",
91+
4, true);
92+
93+
JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
94+
Jdbi jdbi = cm.getJdbi("testdb");
95+
96+
try (ExecutorService es = Executors.newFixedThreadPool(3)) {
97+
try {
98+
CountDownLatch hold = new CountDownLatch(1);
99+
CountDownLatch acquiredLatch = new CountDownLatch(2);
100+
101+
// Open two connections and keep them open
102+
for (int i = 0; i < 2; i++) {
103+
es.submit(() -> {
104+
try (var h = jdbi.open()) {
105+
acquiredLatch.countDown();
106+
boolean released = hold.await(10, TimeUnit.SECONDS);
107+
assertThat(released).isTrue();
108+
}
109+
return null;
110+
});
111+
}
112+
113+
// Wait until both connections are really open (avoid race conditions)
114+
boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS);
115+
assertThat(bothAcquired).isTrue();
116+
117+
// Third connection attempt should NOT block since no pool is used
118+
Future<Boolean> third = es.submit(() -> {
119+
var h = jdbi.open();
120+
h.close();
121+
return true;
122+
});
123+
124+
boolean completedIn200ms;
125+
try {
126+
third.get(200, TimeUnit.MILLISECONDS);
127+
completedIn200ms = true; // not blocked - expected behavior
128+
} catch (TimeoutException ignore) {
129+
completedIn200ms = false; // blocked - incorrect for no-pool case
130+
}
131+
132+
assertThat(completedIn200ms)
133+
.as("third getJdbi().open() should NOT block when no pool is configured")
134+
.isTrue();
135+
136+
// check H2 session count to confirm multiple physical connections were opened
137+
int sessions = jdbi.withHandle(h ->
138+
h.createQuery("SELECT COUNT(*) FROM INFORMATION_SCHEMA.SESSIONS")
139+
.mapTo(int.class)
140+
.one());
141+
assertThat(sessions).isGreaterThanOrEqualTo(3);
142+
143+
// Release the first two connections
144+
hold.countDown();
145+
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();
146+
} finally {
147+
es.shutdownNow();
148+
}
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)