Skip to content

Commit 3a7f15f

Browse files
Chaho12Jaeho Yoo
authored andcommitted
Add support for request and response compression
1 parent 016c21f commit 3a7f15f

File tree

3 files changed

+116
-16
lines changed

3 files changed

+116
-16
lines changed

gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,6 @@ public ProxyRequestHandler(
109109
proxyResponseConfiguration = haGatewayConfiguration.getProxyResponseConfiguration();
110110
}
111111

112-
private static String getRemoteTarget(URI remoteUri)
113-
{
114-
return format("%s://%s", remoteUri.getScheme(), remoteUri.getAuthority());
115-
}
116-
117112
private static Response handleProxyException(Request request, ProxyException e)
118113
{
119114
log.warn(e, "Proxy request failed: %s %s", request.getMethod(), request.getUri());
@@ -129,8 +124,7 @@ private static WebApplicationException badRequest(String message)
129124
.build());
130125
}
131126

132-
public static QueryHistoryManager.QueryDetail getQueryDetailsFromRequest(Request request,
133-
Optional<String> username)
127+
public static QueryHistoryManager.QueryDetail getQueryDetailsFromRequest(Request request, Optional<String> username)
134128
{
135129
QueryHistoryManager.QueryDetail queryDetail = new QueryHistoryManager.QueryDetail();
136130
queryDetail.setBackendUrl(getRemoteTarget(request.getUri()));
@@ -143,6 +137,11 @@ public static QueryHistoryManager.QueryDetail getQueryDetailsFromRequest(Request
143137
return queryDetail;
144138
}
145139

140+
private static String getRemoteTarget(URI remoteUri)
141+
{
142+
return format("%s://%s", remoteUri.getScheme(), remoteUri.getAuthority());
143+
}
144+
146145
@PreDestroy
147146
public void shutdown()
148147
{
@@ -201,8 +200,7 @@ private void performRequest(
201200
for (String name : list(servletRequest.getHeaderNames())) {
202201
for (String value : list(servletRequest.getHeaders(name))) {
203202
// TODO: decide what should and shouldn't be forwarded
204-
if (!name.equalsIgnoreCase("Accept-Encoding")
205-
&& !name.equalsIgnoreCase("Host")
203+
if (!name.equalsIgnoreCase("Host")
206204
&& (addXForwardedHeaders || !name.startsWith("X-Forwarded"))) {
207205
requestBuilder.addHeader(name, value);
208206
}
@@ -262,7 +260,7 @@ else if (servletRequest.getCookies() != null) {
262260

263261
private Response buildResponse(ProxyResponse response, ImmutableList<NewCookie> cookie)
264262
{
265-
Response.ResponseBuilder builder = Response.status(response.statusCode()).entity(response.body());
263+
Response.ResponseBuilder builder = Response.status(response.statusCode()).entity(response.getRawBody());
266264
response.headers().forEach((headerName, value) -> builder.header(headerName.toString(), value));
267265
cookie.forEach(builder::cookie);
268266
return builder.build();
@@ -287,26 +285,27 @@ private FluentFuture<ProxyResponse> executeHttp(Request request)
287285
private ProxyResponse recordBackendForQueryId(Request request, ProxyResponse response, Optional<String> username,
288286
RoutingDestination routingDestination)
289287
{
290-
log.debug("For Request [%s] got Response [%s]", request.getUri(), response.body());
288+
String body = response.getDecompressedBody();
289+
log.debug("For Request [%s] got Response [%s]", request.getUri(), body);
291290

292291
QueryHistoryManager.QueryDetail queryDetail = getQueryDetailsFromRequest(request, username);
293292

294293
log.debug("Extracting proxy destination : [%s] for request : [%s]", queryDetail.getBackendUrl(), request.getUri());
295294

296295
if (response.statusCode() == OK.getStatusCode()) {
297296
try {
298-
HashMap<String, String> results = OBJECT_MAPPER.readValue(response.body(), HashMap.class);
297+
HashMap<String, String> results = OBJECT_MAPPER.readValue(body, HashMap.class);
299298
queryDetail.setQueryId(results.get("id"));
300299
routingManager.setBackendForQueryId(queryDetail.getQueryId(), queryDetail.getBackendUrl());
301300
routingManager.setRoutingGroupForQueryId(queryDetail.getQueryId(), routingDestination.routingGroup());
302301
log.debug("QueryId [%s] mapped with proxy [%s]", queryDetail.getQueryId(), queryDetail.getBackendUrl());
303302
}
304303
catch (IOException e) {
305-
log.error("Failed to get QueryId from response [%s] , Status code [%s]", response.body(), response.statusCode());
304+
log.error("Failed to get QueryId from response [%s] , Status code [%s]", body, response.statusCode());
306305
}
307306
}
308307
else {
309-
log.error("Non OK HTTP Status code with response [%s] , Status code [%s], user: [%s]", response.body(), response.statusCode(), username.orElse(null));
308+
log.error("Non OK HTTP Status code with response [%s] , Status code [%s], user: [%s]", body, response.statusCode(), username.orElse(null));
310309
}
311310
queryDetail.setRoutingGroup(routingDestination.routingGroup());
312311
queryDetail.setExternalUrl(routingDestination.externalUrl());

gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyResponseHandler.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import io.trino.gateway.ha.config.ProxyResponseConfiguration;
2323
import io.trino.gateway.proxyserver.ProxyResponseHandler.ProxyResponse;
2424

25+
import java.io.ByteArrayInputStream;
2526
import java.io.IOException;
27+
import java.io.InputStream;
2628
import java.nio.charset.StandardCharsets;
29+
import java.util.zip.GZIPInputStream;
2730

2831
import static java.util.Objects.requireNonNull;
2932

@@ -47,7 +50,9 @@ public ProxyResponse handleException(Request request, Exception exception)
4750
public ProxyResponse handle(Request request, Response response)
4851
{
4952
try {
50-
return new ProxyResponse(response.getStatusCode(), response.getHeaders(), new String(response.getInputStream().readNBytes((int) responseSize.toBytes()), StandardCharsets.UTF_8));
53+
// Store raw bytes to preserve compression
54+
byte[] responseBodyBytes = response.getInputStream().readNBytes((int) responseSize.toBytes());
55+
return new ProxyResponse(response.getStatusCode(), response.getHeaders(), responseBodyBytes);
5156
}
5257
catch (IOException e) {
5358
throw new ProxyException("Failed reading response from remote Trino server", e);
@@ -57,11 +62,53 @@ public ProxyResponse handle(Request request, Response response)
5762
public record ProxyResponse(
5863
int statusCode,
5964
ListMultimap<HeaderName, String> headers,
60-
String body)
65+
byte[] body)
6166
{
6267
public ProxyResponse
6368
{
6469
requireNonNull(headers, "headers is null");
70+
requireNonNull(body, "body is null");
71+
}
72+
73+
/**
74+
* Get the response body as raw bytes for sending to clients (preserves
75+
* compression)
76+
*/
77+
public byte[] getRawBody()
78+
{
79+
return body;
80+
}
81+
82+
/**
83+
* Get the response body as a decompressed string for JSON parsing and logging.
84+
* Only call this when you need to parse the content, not when passing through
85+
* to clients.
86+
*/
87+
public String getDecompressedBody()
88+
{
89+
// Check if the response is gzip-compressed
90+
String contentEncoding = null;
91+
for (HeaderName headerName : headers.keySet()) {
92+
if (headerName.toString().equalsIgnoreCase("Content-Encoding")) {
93+
contentEncoding = headers.get(headerName).iterator().next();
94+
break;
95+
}
96+
}
97+
98+
if ("gzip".equalsIgnoreCase(contentEncoding)) {
99+
try {
100+
try (InputStream inputStream = new GZIPInputStream(new ByteArrayInputStream(body))) {
101+
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
102+
}
103+
}
104+
catch (IOException e) {
105+
// If decompression fails, return the body as UTF-8 string
106+
return new String(body, StandardCharsets.UTF_8);
107+
}
108+
}
109+
110+
// Not compressed, convert bytes to string
111+
return new String(body, StandardCharsets.UTF_8);
65112
}
66113
}
67114
}

gateway-ha/src/test/java/io/trino/gateway/proxyserver/TestProxyRequestHandler.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ final class TestProxyRequestHandler
5353
private static final String OK = "OK";
5454
private static final int NOT_FOUND = 404;
5555
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
56+
5657
private final OkHttpClient httpClient = new OkHttpClient();
5758
private final MockWebServer mockTrinoServer = new MockWebServer();
5859
private final PostgreSQLContainer postgresql = new PostgreSQLContainer("postgres:17");
@@ -77,6 +78,14 @@ public MockResponse dispatch(RecordedRequest request)
7778
.setBody("{\"starting\": false}");
7879
}
7980

81+
if (request.getPath().equals(healthCheckEndpoint + "?test-compression")) {
82+
// Return the Accept-Encoding header value for compression testing
83+
String acceptEncoding = request.getHeader("Accept-Encoding");
84+
return new MockResponse().setResponseCode(200)
85+
.setHeader(CONTENT_TYPE, JSON_UTF_8)
86+
.setBody(acceptEncoding != null ? acceptEncoding : "null");
87+
}
88+
8089
if (request.getMethod().equals("PUT") && request.getPath().equals(customPutEndpoint)) {
8190
return new MockResponse().setResponseCode(200)
8291
.setHeader(CONTENT_TYPE, JSON_UTF_8)
@@ -156,4 +165,49 @@ void testGetQueryDetailsFromRequest()
156165
assertThat(queryDetail.getSource()).isEqualTo("trino-cli");
157166
assertThat(queryDetail.getBackendUrl()).isEqualTo("http://localhost:" + routerPort);
158167
}
168+
169+
@Test
170+
void testAcceptEncodingHeaderForwarding()
171+
throws Exception
172+
{
173+
// Test that Accept-Encoding header is properly forwarded to backends
174+
String url = "http://localhost:" + routerPort + healthCheckEndpoint + "?test-compression";
175+
String expectedAcceptEncoding = "gzip, deflate, br";
176+
177+
Request request = new Request.Builder()
178+
.url(url)
179+
.get()
180+
.addHeader("Accept-Encoding", expectedAcceptEncoding)
181+
.build();
182+
183+
try (Response response = httpClient.newCall(request).execute()) {
184+
assertThat(response.code()).isEqualTo(200);
185+
assertThat(response.body()).isNotNull();
186+
187+
// The mock backend returns the Accept-Encoding header value in the response body
188+
assertThat(response.body().string()).isEqualTo(expectedAcceptEncoding);
189+
}
190+
}
191+
192+
@Test
193+
void testDefaultAcceptEncodingHeaderForwarding()
194+
throws Exception
195+
{
196+
// Test that requests without explicit Accept-Encoding header work correctly
197+
// Note: OkHttp automatically adds "Accept-Encoding: gzip" when none is specified
198+
String url = "http://localhost:" + routerPort + healthCheckEndpoint + "?test-compression";
199+
200+
Request request = new Request.Builder()
201+
.url(url)
202+
.get()
203+
.build(); // No explicit Accept-Encoding header
204+
205+
try (Response response = httpClient.newCall(request).execute()) {
206+
assertThat(response.code()).isEqualTo(200);
207+
assertThat(response.body()).isNotNull();
208+
209+
// OkHttp automatically adds "Accept-Encoding: gzip" when none is specified
210+
assertThat(response.body().string()).isEqualTo("gzip");
211+
}
212+
}
159213
}

0 commit comments

Comments
 (0)