From 4def63ea90eaf8bddb80bcf7b746d28701529c62 Mon Sep 17 00:00:00 2001 From: hpopuri Date: Wed, 26 Nov 2025 12:00:20 +0530 Subject: [PATCH] Add configurable query history recording via queryHistoryEnabled --- gateway-ha/config.yaml | 5 + .../ha/config/RoutingConfiguration.java | 12 ++ .../proxyserver/ProxyRequestHandler.java | 6 +- .../ha/config/TestRoutingConfiguration.java | 65 ++++++++ ...oxyRequestHandlerQueryHistoryDisabled.java | 137 ++++++++++++++++ ...roxyRequestHandlerQueryHistoryEnabled.java | 146 ++++++++++++++++++ .../test/resources/test-config-template.yml | 3 + ...est-config-with-query-history-disabled.yml | 31 ++++ 8 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 gateway-ha/src/test/java/io/trino/gateway/ha/config/TestRoutingConfiguration.java create mode 100644 gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryDisabled.java create mode 100644 gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryEnabled.java create mode 100644 gateway-ha/src/test/resources/test-config-with-query-history-disabled.yml diff --git a/gateway-ha/config.yaml b/gateway-ha/config.yaml index c9e0c4a29..2fa50cb2b 100644 --- a/gateway-ha/config.yaml +++ b/gateway-ha/config.yaml @@ -6,6 +6,11 @@ routingRules: rulesEngineEnabled: False # rulesConfigPath: "src/main/resources/rules/routing_rules.yml" +routing: + # Enable or disable query history recording to database (default: true) + queryHistoryEnabled: true + + dataStore: jdbcUrl: jdbc:postgresql://localhost:5432/trino_gateway_db user: trino_gateway_db_admin diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java index 035218d3d..93426675e 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java @@ -25,6 +25,8 @@ public class RoutingConfiguration private String defaultRoutingGroup = "adhoc"; + private boolean queryHistoryEnabled = true; + public Duration getAsyncTimeout() { return asyncTimeout; @@ -54,4 +56,14 @@ public void setDefaultRoutingGroup(String defaultRoutingGroup) { this.defaultRoutingGroup = defaultRoutingGroup; } + + public boolean isQueryHistoryEnabled() + { + return queryHistoryEnabled; + } + + public void setQueryHistoryEnabled(boolean queryHistoryEnabled) + { + this.queryHistoryEnabled = queryHistoryEnabled; + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java b/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java index 8da9713ca..3b195cf8a 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java +++ b/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java @@ -89,6 +89,7 @@ public class ProxyRequestHandler private final List statementPaths; private final boolean includeClusterInfoInResponse; private final ProxyResponseConfiguration proxyResponseConfiguration; + private final boolean queryHistoryEnabled; @Inject public ProxyRequestHandler( @@ -106,6 +107,7 @@ public ProxyRequestHandler( statementPaths = haGatewayConfiguration.getStatementPaths(); this.includeClusterInfoInResponse = haGatewayConfiguration.isIncludeClusterHostInResponse(); proxyResponseConfiguration = haGatewayConfiguration.getProxyResponseConfiguration(); + this.queryHistoryEnabled = haGatewayConfiguration.getRouting().isQueryHistoryEnabled(); } @PreDestroy @@ -292,7 +294,9 @@ private ProxyResponse recordBackendForQueryId(Request request, ProxyResponse res } queryDetail.setRoutingGroup(routingDestination.routingGroup()); queryDetail.setExternalUrl(routingDestination.externalUrl()); - queryHistoryManager.submitQueryDetail(queryDetail); + if (queryHistoryEnabled) { + queryHistoryManager.submitQueryDetail(queryDetail); + } return response; } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/config/TestRoutingConfiguration.java b/gateway-ha/src/test/java/io/trino/gateway/ha/config/TestRoutingConfiguration.java new file mode 100644 index 000000000..66d1d3181 --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/config/TestRoutingConfiguration.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.config; + +import io.airlift.units.Duration; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.assertj.core.api.Assertions.assertThat; + +class TestRoutingConfiguration +{ + @Test + void testDefaultValues() + { + RoutingConfiguration routingConfiguration = new RoutingConfiguration(); + assertThat(routingConfiguration.getAsyncTimeout()).isEqualTo(new Duration(2, MINUTES)); + assertThat(routingConfiguration.isAddXForwardedHeaders()).isTrue(); + assertThat(routingConfiguration.getDefaultRoutingGroup()).isEqualTo("adhoc"); + assertThat(routingConfiguration.isQueryHistoryEnabled()).isTrue(); + } + + @Test + void testQueryHistoryEnabledSetter() + { + RoutingConfiguration routingConfiguration = new RoutingConfiguration(); + assertThat(routingConfiguration.isQueryHistoryEnabled()).isTrue(); + + routingConfiguration.setQueryHistoryEnabled(false); + assertThat(routingConfiguration.isQueryHistoryEnabled()).isFalse(); + + routingConfiguration.setQueryHistoryEnabled(true); + assertThat(routingConfiguration.isQueryHistoryEnabled()).isTrue(); + } + + @Test + void testAllSetters() + { + RoutingConfiguration routingConfiguration = new RoutingConfiguration(); + + Duration customTimeout = new Duration(5, MINUTES); + routingConfiguration.setAsyncTimeout(customTimeout); + assertThat(routingConfiguration.getAsyncTimeout()).isEqualTo(customTimeout); + + routingConfiguration.setAddXForwardedHeaders(false); + assertThat(routingConfiguration.isAddXForwardedHeaders()).isFalse(); + + routingConfiguration.setDefaultRoutingGroup("batch"); + assertThat(routingConfiguration.getDefaultRoutingGroup()).isEqualTo("batch"); + + routingConfiguration.setQueryHistoryEnabled(false); + assertThat(routingConfiguration.isQueryHistoryEnabled()).isFalse(); + } +} diff --git a/gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryDisabled.java b/gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryDisabled.java new file mode 100644 index 000000000..36f2be76d --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryDisabled.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.proxyserver; + +import io.trino.gateway.ha.HaGatewayLauncher; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import static com.google.common.net.HttpHeaders.CONTENT_TYPE; +import static com.google.common.net.MediaType.JSON_UTF_8; +import static io.trino.gateway.ha.HaGatewayTestUtils.buildGatewayConfig; +import static io.trino.gateway.ha.HaGatewayTestUtils.prepareMockBackend; +import static io.trino.gateway.ha.HaGatewayTestUtils.setUpBackend; +import static io.trino.gateway.ha.handler.HttpUtils.V1_STATEMENT_PATH; +import static io.trino.gateway.ha.util.TestcontainersUtils.createPostgreSqlContainer; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestProxyRequestHandlerQueryHistoryDisabled +{ + private final OkHttpClient httpClient = new OkHttpClient(); + private final MockWebServer mockTrinoServer = new MockWebServer(); + private final PostgreSQLContainer postgresql = createPostgreSqlContainer(); + + private final int routerPort = 22001 + (int) (Math.random() * 1000); + private final int customBackendPort = 22000 + (int) (Math.random() * 1000); + + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final String TEST_QUERY_ID = "20240101_123456_00000_abcde"; + + private final String healthCheckEndpoint = "/v1/info"; + private Jdbi jdbi; + + @BeforeAll + void setup() + throws Exception + { + prepareMockBackend(mockTrinoServer, customBackendPort, "default custom response"); + mockTrinoServer.setDispatcher(new Dispatcher() { + @Override + public MockResponse dispatch(RecordedRequest request) + { + if (request.getPath().equals(healthCheckEndpoint)) { + return new MockResponse().setResponseCode(200) + .setHeader(CONTENT_TYPE, JSON_UTF_8) + .setBody("{\"starting\": false}"); + } + + if (request.getMethod().equals("POST") && request.getPath().equals(V1_STATEMENT_PATH)) { + return new MockResponse().setResponseCode(200) + .setHeader(CONTENT_TYPE, JSON_UTF_8) + .setBody("{\"id\": \"" + TEST_QUERY_ID + "\", \"stats\": {}}"); + } + + return new MockResponse().setResponseCode(404); + } + }); + + postgresql.start(); + + File testConfigFile = buildGatewayConfig(postgresql, routerPort, "test-config-with-query-history-disabled.yml"); + + String[] args = {testConfigFile.getAbsolutePath()}; + HaGatewayLauncher.main(args); + + setUpBackend("custom", "http://localhost:" + customBackendPort, "externalUrl", true, "adhoc", routerPort); + + jdbi = Jdbi.create(postgresql.getJdbcUrl(), postgresql.getUsername(), postgresql.getPassword()); + } + + @AfterAll + void cleanup() + throws Exception + { + mockTrinoServer.shutdown(); + } + + @Test + void testQueryHistoryNotRecordedWhenDisabled() + throws Exception + { + String url = "http://localhost:" + routerPort + V1_STATEMENT_PATH; + String testQuery = "SELECT 1"; + RequestBody requestBody = RequestBody.create(testQuery, MEDIA_TYPE); + + Request postRequest = new Request.Builder() + .url(url) + .addHeader("X-Trino-User", "test-user") + .post(requestBody) + .build(); + + try (Response response = httpClient.newCall(postRequest).execute()) { + assertThat(response.isSuccessful()).isTrue(); + assertThat(response.body()).isNotNull(); + String responseBody = response.body().string(); + assertThat(responseBody).contains(TEST_QUERY_ID); + } + + // Verify that query history was NOT recorded in the database + List> queryHistory = jdbi.withHandle(handle -> + handle.createQuery("SELECT * FROM query_history WHERE query_id = :queryId") + .bind("queryId", TEST_QUERY_ID) + .mapToMap() + .list()); + + assertThat(queryHistory).isEmpty(); + } +} diff --git a/gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryEnabled.java b/gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryEnabled.java new file mode 100644 index 000000000..526105450 --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandlerQueryHistoryEnabled.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.proxyserver; + +import io.trino.gateway.ha.HaGatewayLauncher; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import static com.google.common.net.HttpHeaders.CONTENT_TYPE; +import static com.google.common.net.MediaType.JSON_UTF_8; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static io.trino.gateway.ha.HaGatewayTestUtils.buildGatewayConfig; +import static io.trino.gateway.ha.HaGatewayTestUtils.prepareMockBackend; +import static io.trino.gateway.ha.HaGatewayTestUtils.setUpBackend; +import static io.trino.gateway.ha.handler.HttpUtils.V1_STATEMENT_PATH; +import static io.trino.gateway.ha.util.TestcontainersUtils.createPostgreSqlContainer; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestProxyRequestHandlerQueryHistoryEnabled +{ + private final OkHttpClient httpClient = new OkHttpClient(); + private final MockWebServer mockTrinoServer = new MockWebServer(); + private final PostgreSQLContainer postgresql = createPostgreSqlContainer(); + + private final int routerPort = 23001 + (int) (Math.random() * 1000); + private final int customBackendPort = 23000 + (int) (Math.random() * 1000); + + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final String TEST_QUERY_ID = "20240101_123456_00001_xyzab"; + + private final String healthCheckEndpoint = "/v1/info"; + private Jdbi jdbi; + + @BeforeAll + void setup() + throws Exception + { + prepareMockBackend(mockTrinoServer, customBackendPort, "default custom response"); + mockTrinoServer.setDispatcher(new Dispatcher() { + @Override + public MockResponse dispatch(RecordedRequest request) + { + if (request.getPath().equals(healthCheckEndpoint)) { + return new MockResponse().setResponseCode(200) + .setHeader(CONTENT_TYPE, JSON_UTF_8) + .setBody("{\"starting\": false}"); + } + + if (request.getMethod().equals("POST") && request.getPath().equals(V1_STATEMENT_PATH)) { + return new MockResponse().setResponseCode(200) + .setHeader(CONTENT_TYPE, JSON_UTF_8) + .setBody("{\"id\": \"" + TEST_QUERY_ID + "\", \"stats\": {}}"); + } + + return new MockResponse().setResponseCode(404); + } + }); + + postgresql.start(); + + // Use default test config (query history enabled by default) + File testConfigFile = buildGatewayConfig(postgresql, routerPort, "test-config-template.yml"); + + String[] args = {testConfigFile.getAbsolutePath()}; + HaGatewayLauncher.main(args); + + setUpBackend("custom-enabled", "http://localhost:" + customBackendPort, "externalUrl", true, "adhoc", routerPort); + + jdbi = Jdbi.create(postgresql.getJdbcUrl(), postgresql.getUsername(), postgresql.getPassword()); + } + + @AfterAll + void cleanup() + throws Exception + { + mockTrinoServer.shutdown(); + } + + @Test + void testQueryHistoryRecordedWhenEnabled() + throws Exception + { + String url = "http://localhost:" + routerPort + V1_STATEMENT_PATH; + String testQuery = "SELECT 2"; + RequestBody requestBody = RequestBody.create(testQuery, MEDIA_TYPE); + + Request postRequest = new Request.Builder() + .url(url) + .addHeader("X-Trino-User", "test-user-enabled") + .post(requestBody) + .build(); + + try (Response response = httpClient.newCall(postRequest).execute()) { + assertThat(response.isSuccessful()).isTrue(); + assertThat(response.body()).isNotNull(); + String responseBody = response.body().string(); + assertThat(responseBody).contains(TEST_QUERY_ID); + } + + // Wait a bit for async query history submission + sleepUninterruptibly(2, SECONDS); + + // Verify that query history WAS recorded in the database + List> queryHistory = jdbi.withHandle(handle -> + handle.createQuery("SELECT * FROM query_history WHERE query_id = :queryId") + .bind("queryId", TEST_QUERY_ID) + .mapToMap() + .list()); + + assertThat(queryHistory).hasSize(1); + assertThat(queryHistory.get(0).get("query_id")).isEqualTo(TEST_QUERY_ID); + assertThat(queryHistory.get(0).get("user_name")).isEqualTo("test-user-enabled"); + assertThat(queryHistory.get(0).get("query_text")).isEqualTo(testQuery); + } +} diff --git a/gateway-ha/src/test/resources/test-config-template.yml b/gateway-ha/src/test/resources/test-config-template.yml index 03ba6425d..b66d228c6 100644 --- a/gateway-ha/src/test/resources/test-config-template.yml +++ b/gateway-ha/src/test/resources/test-config-template.yml @@ -15,6 +15,9 @@ clusterStatsConfiguration: monitor: taskDelay: 1s +routing: + queryHistoryEnabled: true + extraWhitelistPaths: - '/v1/custom.*' - '/custom/logout.*' diff --git a/gateway-ha/src/test/resources/test-config-with-query-history-disabled.yml b/gateway-ha/src/test/resources/test-config-with-query-history-disabled.yml new file mode 100644 index 000000000..8c097fb3a --- /dev/null +++ b/gateway-ha/src/test/resources/test-config-with-query-history-disabled.yml @@ -0,0 +1,31 @@ +serverConfig: + node.environment: test + http-server.http.port: ${ENV:REQUEST_ROUTER_PORT} + +includeClusterHostInResponse: true +dataStore: + jdbcUrl: ${ENV:POSTGRESQL_JDBC_URL} + user: ${ENV:POSTGRESQL_USER} + password: ${ENV:POSTGRESQL_PASSWORD} + driver: org.postgresql.Driver + +clusterStatsConfiguration: + monitorType: INFO_API + +monitor: + taskDelay: 1s + +routing: + queryHistoryEnabled: false + +extraWhitelistPaths: + - '/v1/custom.*' + - '/custom/logout.*' + +gatewayCookieConfiguration: + enabled: true + cookieSigningSecret: "kjlhbfrewbyuo452cds3dc1234ancdsjh" + +oauth2GatewayCookieConfiguration: + deletePaths: + - "/custom/logout" \ No newline at end of file