Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import static io.airlift.units.DataSize.Unit.KILOBYTE;

public class QueryDataEncodingConfig
{
private boolean jsonEnabled = true;
private boolean jsonZstdEnabled = true;
private boolean jsonLz4Enabled = true;
private DataSize compressionThreshold = DataSize.of(8, KILOBYTE);

public boolean isJsonEnabled()
{
Expand Down Expand Up @@ -60,4 +66,19 @@ public QueryDataEncodingConfig setJsonLz4Enabled(boolean jsonLz4Enabled)
this.jsonLz4Enabled = jsonLz4Enabled;
return this;
}

@MinDataSize("1kB")
@MaxDataSize("4MB")
public DataSize getCompressionThreshold()
{
return compressionThreshold;
}

@Config("protocol.spooling.encoding.compression.threshold")
@ConfigDescription("Do not compress segments smaller than threshold")
public QueryDataEncodingConfig setCompressionThreshold(DataSize compressionThreshold)
{
this.compressionThreshold = compressionThreshold;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder;
import io.trino.server.protocol.OutputColumn;
import io.trino.server.protocol.spooling.QueryDataEncoder;
import io.trino.server.protocol.spooling.QueryDataEncodingConfig;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -106,10 +107,18 @@ public String encoding()
public static class ZstdFactory
extends Factory
{
private final int compressionThreshold;

@Inject
public ZstdFactory(QueryDataEncodingConfig config)
{
this.compressionThreshold = toIntExact(config.getCompressionThreshold().toBytes());
}

@Override
public QueryDataEncoder create(Session session, List<OutputColumn> columns)
{
return new ZstdQueryDataEncoder(super.create(session, columns));
return new ZstdQueryDataEncoder(super.create(session, columns), compressionThreshold);
}

@Override
Expand All @@ -122,10 +131,18 @@ public String encoding()
public static class Lz4Factory
extends Factory
{
private final int compressionThreshold;

@Inject
public Lz4Factory(QueryDataEncodingConfig config)
{
this.compressionThreshold = toIntExact(config.getCompressionThreshold().toBytes());
}

@Override
public QueryDataEncoder create(Session session, List<OutputColumn> columns)
{
return new Lz4QueryDataEncoder(super.create(session, columns));
return new Lz4QueryDataEncoder(super.create(session, columns), compressionThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
public class Lz4QueryDataEncoder
extends CompressedQueryDataEncoder
{
private static final int COMPRESSION_THRESHOLD = 8192;

public Lz4QueryDataEncoder(QueryDataEncoder delegate)
public Lz4QueryDataEncoder(QueryDataEncoder delegate, int compressionThreshold)
{
super(delegate, COMPRESSION_THRESHOLD);
super(delegate, compressionThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
public class ZstdQueryDataEncoder
extends CompressedQueryDataEncoder
{
private static final int COMPRESSION_THRESHOLD = 8192;

public ZstdQueryDataEncoder(QueryDataEncoder delegate)
public ZstdQueryDataEncoder(QueryDataEncoder delegate, int compressionThreshold)
{
super(delegate, COMPRESSION_THRESHOLD);
super(delegate, compressionThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package io.trino.server.protocol.spooling;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

class TestQueryDataEncodingConfig
{
Expand All @@ -30,7 +33,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(QueryDataEncodingConfig.class)
.setJsonEnabled(true)
.setJsonLz4Enabled(true)
.setJsonZstdEnabled(true));
.setJsonZstdEnabled(true)
.setCompressionThreshold(DataSize.of(8, KILOBYTE)));
}

@Test
Expand All @@ -40,12 +44,14 @@ public void testExplicitPropertyMappings()
.put("protocol.spooling.encoding.json.enabled", "false")
.put("protocol.spooling.encoding.json+lz4.enabled", "false")
.put("protocol.spooling.encoding.json+zstd.enabled", "false")
.put("protocol.spooling.encoding.compression.threshold", "1MB")
.buildOrThrow();

QueryDataEncodingConfig expected = new QueryDataEncodingConfig()
.setJsonEnabled(false)
.setJsonLz4Enabled(false)
.setJsonZstdEnabled(false);
.setJsonZstdEnabled(false)
.setCompressionThreshold(DataSize.of(1, MEGABYTE));

assertFullMapping(properties, expected);
}
Expand Down