Skip to content

Commit 4def63e

Browse files
committed
Add configurable query history recording via queryHistoryEnabled
1 parent 5a36940 commit 4def63e

File tree

8 files changed

+404
-1
lines changed

8 files changed

+404
-1
lines changed

gateway-ha/config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ routingRules:
66
rulesEngineEnabled: False
77
# rulesConfigPath: "src/main/resources/rules/routing_rules.yml"
88

9+
routing:
10+
# Enable or disable query history recording to database (default: true)
11+
queryHistoryEnabled: true
12+
13+
914
dataStore:
1015
jdbcUrl: jdbc:postgresql://localhost:5432/trino_gateway_db
1116
user: trino_gateway_db_admin

gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public class RoutingConfiguration
2525

2626
private String defaultRoutingGroup = "adhoc";
2727

28+
private boolean queryHistoryEnabled = true;
29+
2830
public Duration getAsyncTimeout()
2931
{
3032
return asyncTimeout;
@@ -54,4 +56,14 @@ public void setDefaultRoutingGroup(String defaultRoutingGroup)
5456
{
5557
this.defaultRoutingGroup = defaultRoutingGroup;
5658
}
59+
60+
public boolean isQueryHistoryEnabled()
61+
{
62+
return queryHistoryEnabled;
63+
}
64+
65+
public void setQueryHistoryEnabled(boolean queryHistoryEnabled)
66+
{
67+
this.queryHistoryEnabled = queryHistoryEnabled;
68+
}
5769
}

gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class ProxyRequestHandler
8989
private final List<String> statementPaths;
9090
private final boolean includeClusterInfoInResponse;
9191
private final ProxyResponseConfiguration proxyResponseConfiguration;
92+
private final boolean queryHistoryEnabled;
9293

9394
@Inject
9495
public ProxyRequestHandler(
@@ -106,6 +107,7 @@ public ProxyRequestHandler(
106107
statementPaths = haGatewayConfiguration.getStatementPaths();
107108
this.includeClusterInfoInResponse = haGatewayConfiguration.isIncludeClusterHostInResponse();
108109
proxyResponseConfiguration = haGatewayConfiguration.getProxyResponseConfiguration();
110+
this.queryHistoryEnabled = haGatewayConfiguration.getRouting().isQueryHistoryEnabled();
109111
}
110112

111113
@PreDestroy
@@ -292,7 +294,9 @@ private ProxyResponse recordBackendForQueryId(Request request, ProxyResponse res
292294
}
293295
queryDetail.setRoutingGroup(routingDestination.routingGroup());
294296
queryDetail.setExternalUrl(routingDestination.externalUrl());
295-
queryHistoryManager.submitQueryDetail(queryDetail);
297+
if (queryHistoryEnabled) {
298+
queryHistoryManager.submitQueryDetail(queryDetail);
299+
}
296300
return response;
297301
}
298302

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.config;
15+
16+
import io.airlift.units.Duration;
17+
import org.junit.jupiter.api.Test;
18+
19+
import static java.util.concurrent.TimeUnit.MINUTES;
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
class TestRoutingConfiguration
23+
{
24+
@Test
25+
void testDefaultValues()
26+
{
27+
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
28+
assertThat(routingConfiguration.getAsyncTimeout()).isEqualTo(new Duration(2, MINUTES));
29+
assertThat(routingConfiguration.isAddXForwardedHeaders()).isTrue();
30+
assertThat(routingConfiguration.getDefaultRoutingGroup()).isEqualTo("adhoc");
31+
assertThat(routingConfiguration.isQueryHistoryEnabled()).isTrue();
32+
}
33+
34+
@Test
35+
void testQueryHistoryEnabledSetter()
36+
{
37+
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
38+
assertThat(routingConfiguration.isQueryHistoryEnabled()).isTrue();
39+
40+
routingConfiguration.setQueryHistoryEnabled(false);
41+
assertThat(routingConfiguration.isQueryHistoryEnabled()).isFalse();
42+
43+
routingConfiguration.setQueryHistoryEnabled(true);
44+
assertThat(routingConfiguration.isQueryHistoryEnabled()).isTrue();
45+
}
46+
47+
@Test
48+
void testAllSetters()
49+
{
50+
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
51+
52+
Duration customTimeout = new Duration(5, MINUTES);
53+
routingConfiguration.setAsyncTimeout(customTimeout);
54+
assertThat(routingConfiguration.getAsyncTimeout()).isEqualTo(customTimeout);
55+
56+
routingConfiguration.setAddXForwardedHeaders(false);
57+
assertThat(routingConfiguration.isAddXForwardedHeaders()).isFalse();
58+
59+
routingConfiguration.setDefaultRoutingGroup("batch");
60+
assertThat(routingConfiguration.getDefaultRoutingGroup()).isEqualTo("batch");
61+
62+
routingConfiguration.setQueryHistoryEnabled(false);
63+
assertThat(routingConfiguration.isQueryHistoryEnabled()).isFalse();
64+
}
65+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.proxyserver;
15+
16+
import io.trino.gateway.ha.HaGatewayLauncher;
17+
import okhttp3.MediaType;
18+
import okhttp3.OkHttpClient;
19+
import okhttp3.Request;
20+
import okhttp3.RequestBody;
21+
import okhttp3.Response;
22+
import okhttp3.mockwebserver.Dispatcher;
23+
import okhttp3.mockwebserver.MockResponse;
24+
import okhttp3.mockwebserver.MockWebServer;
25+
import okhttp3.mockwebserver.RecordedRequest;
26+
import org.jdbi.v3.core.Jdbi;
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.TestInstance;
31+
import org.testcontainers.containers.PostgreSQLContainer;
32+
33+
import java.io.File;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
38+
import static com.google.common.net.MediaType.JSON_UTF_8;
39+
import static io.trino.gateway.ha.HaGatewayTestUtils.buildGatewayConfig;
40+
import static io.trino.gateway.ha.HaGatewayTestUtils.prepareMockBackend;
41+
import static io.trino.gateway.ha.HaGatewayTestUtils.setUpBackend;
42+
import static io.trino.gateway.ha.handler.HttpUtils.V1_STATEMENT_PATH;
43+
import static io.trino.gateway.ha.util.TestcontainersUtils.createPostgreSqlContainer;
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
46+
47+
@TestInstance(PER_CLASS)
48+
final class TestProxyRequestHandlerQueryHistoryDisabled
49+
{
50+
private final OkHttpClient httpClient = new OkHttpClient();
51+
private final MockWebServer mockTrinoServer = new MockWebServer();
52+
private final PostgreSQLContainer postgresql = createPostgreSqlContainer();
53+
54+
private final int routerPort = 22001 + (int) (Math.random() * 1000);
55+
private final int customBackendPort = 22000 + (int) (Math.random() * 1000);
56+
57+
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
58+
private static final String TEST_QUERY_ID = "20240101_123456_00000_abcde";
59+
60+
private final String healthCheckEndpoint = "/v1/info";
61+
private Jdbi jdbi;
62+
63+
@BeforeAll
64+
void setup()
65+
throws Exception
66+
{
67+
prepareMockBackend(mockTrinoServer, customBackendPort, "default custom response");
68+
mockTrinoServer.setDispatcher(new Dispatcher() {
69+
@Override
70+
public MockResponse dispatch(RecordedRequest request)
71+
{
72+
if (request.getPath().equals(healthCheckEndpoint)) {
73+
return new MockResponse().setResponseCode(200)
74+
.setHeader(CONTENT_TYPE, JSON_UTF_8)
75+
.setBody("{\"starting\": false}");
76+
}
77+
78+
if (request.getMethod().equals("POST") && request.getPath().equals(V1_STATEMENT_PATH)) {
79+
return new MockResponse().setResponseCode(200)
80+
.setHeader(CONTENT_TYPE, JSON_UTF_8)
81+
.setBody("{\"id\": \"" + TEST_QUERY_ID + "\", \"stats\": {}}");
82+
}
83+
84+
return new MockResponse().setResponseCode(404);
85+
}
86+
});
87+
88+
postgresql.start();
89+
90+
File testConfigFile = buildGatewayConfig(postgresql, routerPort, "test-config-with-query-history-disabled.yml");
91+
92+
String[] args = {testConfigFile.getAbsolutePath()};
93+
HaGatewayLauncher.main(args);
94+
95+
setUpBackend("custom", "http://localhost:" + customBackendPort, "externalUrl", true, "adhoc", routerPort);
96+
97+
jdbi = Jdbi.create(postgresql.getJdbcUrl(), postgresql.getUsername(), postgresql.getPassword());
98+
}
99+
100+
@AfterAll
101+
void cleanup()
102+
throws Exception
103+
{
104+
mockTrinoServer.shutdown();
105+
}
106+
107+
@Test
108+
void testQueryHistoryNotRecordedWhenDisabled()
109+
throws Exception
110+
{
111+
String url = "http://localhost:" + routerPort + V1_STATEMENT_PATH;
112+
String testQuery = "SELECT 1";
113+
RequestBody requestBody = RequestBody.create(testQuery, MEDIA_TYPE);
114+
115+
Request postRequest = new Request.Builder()
116+
.url(url)
117+
.addHeader("X-Trino-User", "test-user")
118+
.post(requestBody)
119+
.build();
120+
121+
try (Response response = httpClient.newCall(postRequest).execute()) {
122+
assertThat(response.isSuccessful()).isTrue();
123+
assertThat(response.body()).isNotNull();
124+
String responseBody = response.body().string();
125+
assertThat(responseBody).contains(TEST_QUERY_ID);
126+
}
127+
128+
// Verify that query history was NOT recorded in the database
129+
List<Map<String, Object>> queryHistory = jdbi.withHandle(handle ->
130+
handle.createQuery("SELECT * FROM query_history WHERE query_id = :queryId")
131+
.bind("queryId", TEST_QUERY_ID)
132+
.mapToMap()
133+
.list());
134+
135+
assertThat(queryHistory).isEmpty();
136+
}
137+
}

0 commit comments

Comments
 (0)