diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java b/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java
index dab1e3df2e98..34f1e6172cbe 100644
--- a/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java
+++ b/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java
@@ -16,7 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import io.airlift.units.DataSize;
-import io.airlift.units.DataSize.Unit;
import java.util.ArrayList;
import java.util.Iterator;
@@ -28,18 +27,48 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
+/**
+ * DictionaryCompressionOptimizer has 2 objectives:
+ * 1) Bound the dictionary memory of the reader, when all columns are read. Reader's dictionary memory
+ * should not exceed the dictionaryMemoryMaxBytesHigh.
+ * 2) When dictionary encoding for a column produces size comparable to the direct encoding, choose
+ * direct encoding over dictionary encoding. Dictionary encoding/decoding is memory and CPU intensive,
+ * so for comparable column sizes, direct encoding is mostly better.
+ *
+ * Note: Dictionary writer might use more memory as they over-allocate dictionary sizes as the writers
+ * build dictionary as they see new data. The hash tables implementation in the dictionary writer's allocate
+ * hash buckets in power of 2. So after a million entries, the overallocation consumes large amount of memory.
+ *
+ * DictionaryCompressionOptimizer functionality can be controlled by the following configs to the constructor.
+ *
+ * 1. dictionaryMemoryMaxBytes -> Max size of the dictionary when all columns are read. Note: Writer
+ * might consume more memory due to the over-allocation.
+ *
+ * 2. dictionaryMemoryAlmostFullRangeBytes -> When the dictionary size exceeds dictionaryMaxMemoryBytes
+ * dictionary columns will be converted to direct to reduce the dictionary size. By setting a range
+ * the stripe can be flushed, before the dictionary is full. When dictionary size is higher than
+ * (dictionaryMemoryMaxBytes - dictionaryMemoryAlmostFullRangeBytes), it is considered almost full
+ * and is ready for flushing. This setting is defined as a delta on dictionaryMemoryMaxBytes for backward compatibility.
+ *
+ * 3. dictionaryUsefulCheckColumnSizeBytes -> Columns start with dictionary encoding and when the dictionary memory
+ * is almost full, usefulness of the dictionary is measured. For large dictionaries (> 40 MB) the check
+ * might happen very late and large dictionary might cause writer to OOM due to writer over allocating for
+ * dictionary growth. When a dictionary for a column grows beyond the dictionaryUsefulCheckColumnSizeBytes the
+ * dictionary usefulness check will be performed and if dictionary is not useful, it will be converted to direct.
+ *
+ * 4. dictionaryUsefulCheckPerChunkFrequency -> dictionaryUsefulCheck could be costly if performed on every chunk.
+ * The dictionaryUsefulCheck will be performed when a column dictionary is above the dictionaryUsefulCheckColumnSizeBytes
+ * and per every dictionaryUsefulCheckPerChunkFrequency chunks written.
+ */
public class DictionaryCompressionOptimizer
{
private static final double DICTIONARY_MIN_COMPRESSION_RATIO = 1.25;
- // Instead of waiting for the dictionary to fill completely, which would force a column into
- // direct mode, close the stripe early assuming it has hit the minimum row count.
- static final DataSize DICTIONARY_MEMORY_MAX_RANGE = new DataSize(4, Unit.MEGABYTE);
-
- static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize(4, Unit.MEGABYTE);
+ static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize(4, MEGABYTE);
private final List allWriters;
private final List directConversionCandidates = new ArrayList<>();
@@ -49,15 +78,21 @@ public class DictionaryCompressionOptimizer
private final int stripeMaxRowCount;
private final int dictionaryMemoryMaxBytesLow;
private final int dictionaryMemoryMaxBytesHigh;
+ private final int dictionaryUsefulCheckColumnSizeBytes;
+ private final int dictionaryUsefulCheckPerChunkFrequency;
private int dictionaryMemoryBytes;
+ private int dictionaryUsefulCheckCounter;
public DictionaryCompressionOptimizer(
Set extends DictionaryColumn> writers,
int stripeMinBytes,
int stripeMaxBytes,
int stripeMaxRowCount,
- int dictionaryMemoryMaxBytes)
+ int dictionaryMemoryMaxBytes,
+ int dictionaryMemoryAlmostFullRangeBytes,
+ int dictionaryUsefulCheckColumnSizeBytes,
+ int dictionaryUsefulCheckPerChunkFrequency)
{
requireNonNull(writers, "writers is null");
this.allWriters = writers.stream()
@@ -74,9 +109,14 @@ public DictionaryCompressionOptimizer(
this.stripeMaxRowCount = stripeMaxRowCount;
checkArgument(dictionaryMemoryMaxBytes >= 0, "dictionaryMemoryMaxBytes is negative");
+ checkArgument(dictionaryMemoryAlmostFullRangeBytes >= 0, "dictionaryMemoryRangeBytes is negative");
this.dictionaryMemoryMaxBytesHigh = dictionaryMemoryMaxBytes;
- this.dictionaryMemoryMaxBytesLow = (int) Math.max(dictionaryMemoryMaxBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes(), 0);
+ this.dictionaryMemoryMaxBytesLow = Math.max(dictionaryMemoryMaxBytes - dictionaryMemoryAlmostFullRangeBytes, 0);
+ checkArgument(dictionaryUsefulCheckPerChunkFrequency >= 0, "dictionaryUsefulCheckPerChunkFrequency is negative");
+ this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency;
+
+ this.dictionaryUsefulCheckColumnSizeBytes = dictionaryUsefulCheckColumnSizeBytes;
directConversionCandidates.addAll(allWriters);
}
@@ -87,12 +127,12 @@ public int getDictionaryMemoryBytes()
public boolean isFull(long bufferedBytes)
{
- // if the strip is big enough to flush, stop before we hit the absolute max, so we are
+ // if the stripe is big enough to flush, stop before we hit the absolute max, so we are
// not forced to convert a dictionary to direct to fit in memory
if (bufferedBytes > stripeMinBytes) {
return dictionaryMemoryBytes > dictionaryMemoryMaxBytesLow;
}
- // strip is small, grow to the high water mark (so at the very least we have more information)
+ // stripe is small, grow to the high watermark (so at the very least we have more information)
return dictionaryMemoryBytes > dictionaryMemoryMaxBytesHigh;
}
@@ -107,30 +147,43 @@ public void reset()
public void finalOptimize(int bufferedBytes)
{
updateDirectConversionCandidates();
- convertLowCompressionStreams(bufferedBytes);
+ convertLowCompressionStreams(true, bufferedBytes);
}
- public void optimize(int bufferedBytes, int stripeRowCount)
+ @VisibleForTesting
+ boolean isUsefulCheckRequired(int dictionaryMemoryBytes)
{
- // recompute the dictionary memory usage
- dictionaryMemoryBytes = allWriters.stream()
- .filter(writer -> !writer.isDirectEncoded())
- .mapToInt(DictionaryColumnManager::getDictionaryBytes)
- .sum();
+ if (dictionaryMemoryBytes < dictionaryUsefulCheckColumnSizeBytes) {
+ return false;
+ }
- // update the dictionary growth history
- allWriters.stream()
- .filter(writer -> !writer.isDirectEncoded())
- .forEach(column -> column.updateHistory(stripeRowCount));
+ dictionaryUsefulCheckCounter++;
+ if (dictionaryUsefulCheckCounter == dictionaryUsefulCheckPerChunkFrequency) {
+ dictionaryUsefulCheckCounter = 0;
+ return true;
+ }
- if (dictionaryMemoryBytes <= dictionaryMemoryMaxBytesLow) {
- return;
+ return false;
+ }
+
+ public void optimize(int bufferedBytes, int stripeRowCount)
+ {
+ // recompute the dictionary memory usage
+ int totalDictionaryBytes = 0;
+ for (DictionaryColumnManager writer : allWriters) {
+ if (!writer.isDirectEncoded()) {
+ totalDictionaryBytes += writer.getDictionaryBytes();
+ writer.updateHistory(stripeRowCount);
+ }
}
+ dictionaryMemoryBytes = totalDictionaryBytes;
- updateDirectConversionCandidates();
+ boolean isDictionaryAlmostFull = dictionaryMemoryBytes > dictionaryMemoryMaxBytesLow;
- // before any further checks, convert all low compression streams
- bufferedBytes = convertLowCompressionStreams(bufferedBytes);
+ if (isDictionaryAlmostFull || isUsefulCheckRequired(dictionaryMemoryBytes)) {
+ updateDirectConversionCandidates();
+ bufferedBytes = convertLowCompressionStreams(isDictionaryAlmostFull, bufferedBytes);
+ }
if (dictionaryMemoryBytes <= dictionaryMemoryMaxBytesLow || bufferedBytes >= stripeMaxBytes) {
return;
@@ -161,7 +214,7 @@ private void optimizeDictionaryColumns(int stripeRowCount, BufferedBytesCounter
return;
}
- // if the stripe is larger then the minimum stripe size, we are not required to convert any more dictionary columns to direct
+ // if the stripe is larger than the minimum stripe size, we are not required to convert any more dictionary columns to direct
if (bufferedBytesCounter.getBufferedBytes() >= stripeMinBytes) {
// check if we can get better compression by converting a dictionary column to direct. This can happen when then there are multiple
// dictionary columns and one does not compress well, so if we convert it to direct we can continue to use the existing dictionaries
@@ -196,20 +249,22 @@ private boolean convertDictionaryColumn(BufferedBytesCounter bufferedBytesCounte
}
@VisibleForTesting
- int convertLowCompressionStreams(int bufferedBytes)
+ int convertLowCompressionStreams(boolean tryAllStreams, int bufferedBytes)
{
// convert all low compression column to direct
Iterator iterator = directConversionCandidates.iterator();
while (iterator.hasNext()) {
DictionaryColumnManager dictionaryWriter = iterator.next();
- if (dictionaryWriter.getCompressionRatio() < DICTIONARY_MIN_COMPRESSION_RATIO) {
- int columnBufferedBytes = toIntExact(dictionaryWriter.getBufferedBytes());
- OptionalInt directBytes = tryConvertToDirect(dictionaryWriter, getMaxDirectBytes(bufferedBytes));
- iterator.remove();
- if (directBytes.isPresent()) {
- bufferedBytes = bufferedBytes + directBytes.getAsInt() - columnBufferedBytes;
- if (bufferedBytes >= stripeMaxBytes) {
- return bufferedBytes;
+ if (tryAllStreams || dictionaryWriter.getDictionaryBytes() >= dictionaryUsefulCheckColumnSizeBytes) {
+ if (dictionaryWriter.getCompressionRatio() < DICTIONARY_MIN_COMPRESSION_RATIO) {
+ int columnBufferedBytes = toIntExact(dictionaryWriter.getBufferedBytes());
+ OptionalInt directBytes = tryConvertToDirect(dictionaryWriter, getMaxDirectBytes(bufferedBytes));
+ iterator.remove();
+ if (directBytes.isPresent()) {
+ bufferedBytes = bufferedBytes + directBytes.getAsInt() - columnBufferedBytes;
+ if (bufferedBytes >= stripeMaxBytes) {
+ return bufferedBytes;
+ }
}
}
}
@@ -217,6 +272,12 @@ int convertLowCompressionStreams(int bufferedBytes)
return bufferedBytes;
}
+ @VisibleForTesting
+ List getDirectConversionCandidates()
+ {
+ return directConversionCandidates;
+ }
+
private void updateDirectConversionCandidates()
{
// Writers can switch to Direct encoding internally. Remove them from direct conversion candidates.
@@ -255,14 +316,14 @@ private double currentCompressionRatio(int totalNonDictionaryBytes)
}
/**
- * Choose a dictionary column to convert to direct encoding. We do this by predicting the compression ration
+ * Choose a dictionary column to convert to direct encoding. We do this by predicting the compression ratio
* of the stripe if a singe column is flipped to direct. So for each column, we try to predict the row count
* when we will hit a stripe flush limit if that column were converted to direct. Once we know the row count, we
* calculate the predicted compression ratio.
*
* @param totalNonDictionaryBytes current size of the stripe without non-dictionary columns
* @param stripeRowCount current number of rows in the stripe
- * @return the column that would produce the best stripe compression ration if converted to direct
+ * @return the column that would produce the best stripe compression ratio if converted to direct
*/
private DictionaryCompressionProjection selectDictionaryColumnToConvert(int totalNonDictionaryBytes, int stripeRowCount)
{
@@ -305,7 +366,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota
long currentIndexBytes = totalDictionaryIndexBytes - column.getIndexBytes();
long currentTotalBytes = currentRawBytes + currentDictionaryBytes + currentIndexBytes;
- // estimate the size of each new row if we were convert this column to direct
+ // estimate the size of each new row if we were to convert this column to direct
double rawBytesPerFutureRow = totalNonDictionaryBytesPerRow + column.getRawBytesPerRow();
double dictionaryBytesPerFutureRow = totalDictionaryBytesPerNewRow - column.getDictionaryBytesPerFutureRow();
double indexBytesPerFutureRow = totalDictionaryIndexBytesPerRow - column.getIndexBytesPerRow();
@@ -317,7 +378,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota
long rowsToStripeRowLimit = stripeMaxRowCount - stripeRowCount;
long rowsToLimit = Longs.min(rowsToDictionaryMemoryLimit, rowsToStripeMemoryLimit, rowsToStripeRowLimit);
- // predict the compression ratio at that limit if we were convert this column to direct
+ // predict the compression ratio at that limit if we were to convert this column to direct
long predictedUncompressedSizeAtLimit = totalNonDictionaryBytes + totalDictionaryRawBytes + (totalUncompressedBytesPerRow * rowsToLimit);
long predictedCompressedSizeAtLimit = (long) (currentTotalBytes + (totalBytesPerFutureRow * rowsToLimit));
double predictedCompressionRatioAtLimit = 1.0 * predictedUncompressedSizeAtLimit / predictedCompressedSizeAtLimit;
@@ -371,7 +432,8 @@ public interface DictionaryColumn
boolean isDirectEncoded();
}
- private static class DictionaryColumnManager
+ @VisibleForTesting
+ static class DictionaryColumnManager
{
private final DictionaryColumn dictionaryColumn;
@@ -481,6 +543,12 @@ public boolean isDirectEncoded()
{
return dictionaryColumn.isDirectEncoded();
}
+
+ @VisibleForTesting
+ public DictionaryColumn getDictionaryColumn()
+ {
+ return dictionaryColumn;
+ }
}
private static class DictionaryCompressionProjection
diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
index 6b49d976dc22..e4a9773834b2 100644
--- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
+++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
@@ -317,14 +317,18 @@ public OrcWriter(
}
}
this.columnWriters = columnWriters.build();
- this.dictionaryMaxMemoryBytes = toIntExact(
- requireNonNull(options.getDictionaryMaxMemory(), "dictionaryMaxMemory is null").toBytes());
+ this.dictionaryMaxMemoryBytes = toIntExact(options.getDictionaryMaxMemory().toBytes());
+ int dictionaryMemoryAlmostFullRangeBytes = toIntExact(options.getDictionaryMemoryAlmostFullRange().toBytes());
+ int dictionaryUsefulCheckColumnSizeBytes = toIntExact(options.getDictionaryUsefulCheckColumnSize().toBytes());
this.dictionaryCompressionOptimizer = new DictionaryCompressionOptimizer(
dictionaryColumnWriters.build(),
stripeMinBytes,
stripeMaxBytes,
stripeMaxRowCount,
- dictionaryMaxMemoryBytes);
+ dictionaryMaxMemoryBytes,
+ dictionaryMemoryAlmostFullRangeBytes,
+ dictionaryUsefulCheckColumnSizeBytes,
+ options.getDictionaryUsefulCheckPerChunkFrequency());
for (Entry entry : this.userMetadata.entrySet()) {
recordValidation(validation -> validation.addMetadataProperty(entry.getKey(), utf8Slice(entry.getValue())));
diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java
index 11a5ec1559fa..8d17fe225929 100644
--- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java
+++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java
@@ -35,6 +35,9 @@ public class OrcWriterOptions
public static final int DEFAULT_STRIPE_MAX_ROW_COUNT = 10_000_000;
public static final int DEFAULT_ROW_GROUP_MAX_ROW_COUNT = 10_000;
public static final DataSize DEFAULT_DICTIONARY_MAX_MEMORY = new DataSize(16, MEGABYTE);
+ public static final DataSize DEFAULT_DICTIONARY_MEMORY_ALMOST_FULL_RANGE = new DataSize(4, MEGABYTE);
+ public static final int DEFAULT_DICTIONARY_USEFUL_CHECK_PER_CHUNK_FREQUENCY = Integer.MAX_VALUE;
+ public static final DataSize DEFAULT_DICTIONARY_USEFUL_CHECK_COLUMN_SIZE = new DataSize(6, MEGABYTE);
public static final DataSize DEFAULT_MAX_STRING_STATISTICS_LIMIT = new DataSize(64, BYTE);
public static final DataSize DEFAULT_MAX_COMPRESSION_BUFFER_SIZE = new DataSize(256, KILOBYTE);
public static final DataSize DEFAULT_DWRF_STRIPE_CACHE_MAX_SIZE = new DataSize(8, MEGABYTE);
@@ -46,6 +49,9 @@ public class OrcWriterOptions
private final int stripeMaxRowCount;
private final int rowGroupMaxRowCount;
private final DataSize dictionaryMaxMemory;
+ private final DataSize dictionaryMemoryAlmostFullRange;
+ private final int dictionaryUsefulCheckPerChunkFrequency;
+ private final DataSize dictionaryUsefulCheckColumnSize;
private final DataSize maxStringStatisticsLimit;
private final DataSize maxCompressionBufferSize;
private final OptionalInt compressionLevel;
@@ -66,6 +72,9 @@ private OrcWriterOptions(
int stripeMaxRowCount,
int rowGroupMaxRowCount,
DataSize dictionaryMaxMemory,
+ DataSize dictionaryMemoryAlmostFullRange,
+ int dictionaryUsefulCheckPerChunkFrequency,
+ DataSize dictionaryUsefulCheckColumnSize,
DataSize maxStringStatisticsLimit,
DataSize maxCompressionBufferSize,
OptionalInt compressionLevel,
@@ -81,6 +90,8 @@ private OrcWriterOptions(
checkArgument(stripeMaxRowCount >= 1, "stripeMaxRowCount must be at least 1");
checkArgument(rowGroupMaxRowCount >= 1, "rowGroupMaxRowCount must be at least 1");
requireNonNull(dictionaryMaxMemory, "dictionaryMaxMemory is null");
+ requireNonNull(dictionaryMemoryAlmostFullRange, "dictionaryMemoryAlmostFullRange is null");
+ requireNonNull(dictionaryUsefulCheckColumnSize, "dictionaryUsefulCheckColumnSize is null");
requireNonNull(maxStringStatisticsLimit, "maxStringStatisticsLimit is null");
requireNonNull(maxCompressionBufferSize, "maxCompressionBufferSize is null");
requireNonNull(compressionLevel, "compressionLevel is null");
@@ -92,6 +103,9 @@ private OrcWriterOptions(
this.stripeMaxRowCount = stripeMaxRowCount;
this.rowGroupMaxRowCount = rowGroupMaxRowCount;
this.dictionaryMaxMemory = dictionaryMaxMemory;
+ this.dictionaryMemoryAlmostFullRange = dictionaryMemoryAlmostFullRange;
+ this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency;
+ this.dictionaryUsefulCheckColumnSize = dictionaryUsefulCheckColumnSize;
this.maxStringStatisticsLimit = maxStringStatisticsLimit;
this.maxCompressionBufferSize = maxCompressionBufferSize;
this.compressionLevel = compressionLevel;
@@ -128,6 +142,21 @@ public DataSize getDictionaryMaxMemory()
return dictionaryMaxMemory;
}
+ public DataSize getDictionaryMemoryAlmostFullRange()
+ {
+ return dictionaryMemoryAlmostFullRange;
+ }
+
+ public int getDictionaryUsefulCheckPerChunkFrequency()
+ {
+ return dictionaryUsefulCheckPerChunkFrequency;
+ }
+
+ public DataSize getDictionaryUsefulCheckColumnSize()
+ {
+ return dictionaryUsefulCheckColumnSize;
+ }
+
public DataSize getMaxStringStatisticsLimit()
{
return maxStringStatisticsLimit;
@@ -182,6 +211,9 @@ public String toString()
.add("stripeMaxRowCount", stripeMaxRowCount)
.add("rowGroupMaxRowCount", rowGroupMaxRowCount)
.add("dictionaryMaxMemory", dictionaryMaxMemory)
+ .add("dictionaryMemoryAlmostFullRange", dictionaryMemoryAlmostFullRange)
+ .add("dictionaryUsefulCheckPerChunkFrequency", dictionaryUsefulCheckPerChunkFrequency)
+ .add("dictionaryUsefulCheckColumnSize", dictionaryUsefulCheckColumnSize)
.add("maxStringStatisticsLimit", maxStringStatisticsLimit)
.add("maxCompressionBufferSize", maxCompressionBufferSize)
.add("compressionLevel", compressionLevel)
@@ -206,6 +238,9 @@ public static class Builder
private int stripeMaxRowCount = DEFAULT_STRIPE_MAX_ROW_COUNT;
private int rowGroupMaxRowCount = DEFAULT_ROW_GROUP_MAX_ROW_COUNT;
private DataSize dictionaryMaxMemory = DEFAULT_DICTIONARY_MAX_MEMORY;
+ private DataSize dictionaryMemoryAlmostFullRange = DEFAULT_DICTIONARY_MEMORY_ALMOST_FULL_RANGE;
+ private int dictionaryUsefulCheckPerChunkFrequency = DEFAULT_DICTIONARY_USEFUL_CHECK_PER_CHUNK_FREQUENCY;
+ private DataSize dictionaryUsefulCheckColumnSize = DEFAULT_DICTIONARY_USEFUL_CHECK_COLUMN_SIZE;
private DataSize maxStringStatisticsLimit = DEFAULT_MAX_STRING_STATISTICS_LIMIT;
private DataSize maxCompressionBufferSize = DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
private OptionalInt compressionLevel = OptionalInt.empty();
@@ -250,6 +285,25 @@ public Builder withDictionaryMaxMemory(DataSize dictionaryMaxMemory)
return this;
}
+ public Builder withDictionaryMemoryAlmostFullRange(DataSize dictionaryMemoryAlmostFullRange)
+ {
+ this.dictionaryMemoryAlmostFullRange = requireNonNull(dictionaryMemoryAlmostFullRange, "dictionaryMemoryAlmostFullRange is null");
+ return this;
+ }
+
+ public Builder withDictionaryUsefulCheckPerChunkFrequency(int dictionaryUsefulCheckPerChunkFrequency)
+ {
+ checkArgument(dictionaryUsefulCheckPerChunkFrequency >= 0, "dictionaryUsefulCheckPerChunkFrequency is negative");
+ this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency;
+ return this;
+ }
+
+ public Builder withDictionaryUsefulCheckColumnSize(DataSize dictionaryUsefulCheckColumnSize)
+ {
+ this.dictionaryUsefulCheckColumnSize = requireNonNull(dictionaryUsefulCheckColumnSize, "dictionaryUsefulCheckColumnSize is null");
+ return this;
+ }
+
public Builder withMaxStringStatisticsLimit(DataSize maxStringStatisticsLimit)
{
this.maxStringStatisticsLimit = requireNonNull(maxStringStatisticsLimit, "maxStringStatisticsLimit is null");
@@ -332,6 +386,9 @@ public OrcWriterOptions build()
stripeMaxRowCount,
rowGroupMaxRowCount,
dictionaryMaxMemory,
+ dictionaryMemoryAlmostFullRange,
+ dictionaryUsefulCheckPerChunkFrequency,
+ dictionaryUsefulCheckColumnSize,
maxStringStatisticsLimit,
maxCompressionBufferSize,
compressionLevel,
diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java
index a82167897bd4..bd0fb7bd97ea 100644
--- a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java
+++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java
@@ -18,6 +18,7 @@
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.Type;
+import com.facebook.presto.orc.DictionaryCompressionOptimizer.DictionaryColumnManager;
import com.facebook.presto.orc.metadata.ColumnEncoding;
import com.facebook.presto.orc.metadata.StripeFooter;
import com.facebook.presto.orc.writer.DictionaryColumnWriter;
@@ -892,8 +893,13 @@ void validate(int batchId, OrcWriter writer)
else {
// Successful conversion to direct, changes the state of the dictionary compression
// optimizer and it should go only via dictionary compression optimizer.
- writer.getDictionaryCompressionOptimizer().convertLowCompressionStreams(bufferedBytes);
+ List directConversionCandidates = writer.getDictionaryCompressionOptimizer().getDirectConversionCandidates();
+ boolean contains = directConversionCandidates.stream().anyMatch(x -> x.getDictionaryColumn() == columnWriter);
+ assertTrue(contains);
+ writer.getDictionaryCompressionOptimizer().convertLowCompressionStreams(true, bufferedBytes);
assertTrue(columnWriter.isDirectEncoded(), "BatchId " + batchId + " bytes " + bufferedBytes);
+ contains = directConversionCandidates.stream().anyMatch(x -> x.getDictionaryColumn() == columnWriter);
+ assertFalse(contains);
}
index++;
}
diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java
index 49c8a2f3046a..ddd5e7e7890a 100644
--- a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java
+++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java
@@ -15,6 +15,7 @@
import com.facebook.presto.orc.DictionaryCompressionOptimizer.DictionaryColumn;
import com.google.common.collect.ImmutableSet;
+import com.google.common.math.IntMath;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import org.testng.annotations.Test;
@@ -26,11 +27,11 @@
import static com.facebook.airlift.testing.Assertions.assertGreaterThanOrEqual;
import static com.facebook.airlift.testing.Assertions.assertLessThan;
-import static com.facebook.presto.orc.DictionaryCompressionOptimizer.DICTIONARY_MEMORY_MAX_RANGE;
import static com.facebook.presto.orc.DictionaryCompressionOptimizer.estimateIndexBytesPerValue;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
+import static java.math.RoundingMode.CEILING;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -38,6 +39,9 @@
public class TestDictionaryCompressionOptimizer
{
+ private static final int DICTIONARY_ALMOST_FULL_MEMORY_RANGE = megabytes(4);
+ private static final int CHUNK_ROW_COUNT = 1024;
+
@Test
public void testNoDictionariesBytesLimit()
{
@@ -152,7 +156,7 @@ public void testSingleDictionaryColumnMemoryLimit()
// construct a simulator that will hit the dictionary (low) memory limit by estimating the number of rows at the memory limit, and then setting large limits around this value
int stripeMaxBytes = megabytes(100);
- int dictionaryMaxMemoryBytesLow = dictionaryMaxMemoryBytes - (int) DICTIONARY_MEMORY_MAX_RANGE.toBytes();
+ int dictionaryMaxMemoryBytesLow = dictionaryMaxMemoryBytes - DICTIONARY_ALMOST_FULL_MEMORY_RANGE;
int expectedMaxRowCount = (int) (dictionaryMaxMemoryBytesLow / bytesPerEntry / uniquePercentage);
DataSimulator simulator = new DataSimulator(0, stripeMaxBytes, expectedMaxRowCount * 2, dictionaryMaxMemoryBytes, 0, column);
@@ -276,7 +280,7 @@ public void testSingleDirectBytesLimit()
// construct a simulator that will flip the column to direct and then hit the bytes limit
int stripeMaxBytes = megabytes(100);
- int expectedRowCountAtFlip = (int) ((dictionaryMaxMemoryBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes()) / bytesPerEntry);
+ int expectedRowCountAtFlip = ((dictionaryMaxMemoryBytes - DICTIONARY_ALMOST_FULL_MEMORY_RANGE) / bytesPerEntry);
int expectedMaxRowCountAtFull = stripeMaxBytes / bytesPerEntry;
DataSimulator simulator = new DataSimulator(0, stripeMaxBytes, expectedMaxRowCountAtFull, dictionaryMaxMemoryBytes, 0, column);
@@ -379,7 +383,7 @@ public void testWideDictionaryAndNarrowDirectBytesLimit()
// construct a simulator that will flip the column to direct and then hit the bytes limit
int stripeMaxBytes = megabytes(2000);
- int dictionaryMaxMemoryBytesLow = (int) (dictionaryMaxMemoryBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes());
+ int dictionaryMaxMemoryBytesLow = dictionaryMaxMemoryBytes - DICTIONARY_ALMOST_FULL_MEMORY_RANGE;
int expectedRowCountAtFlip = (int) ((dictionaryMaxMemoryBytesLow - (dictionaryEntries * dictionaryBytesPerEntry)) / (directBytesPerEntry * directUniquePercentage));
int maxRowCount = 10_000_000;
DataSimulator simulator = new DataSimulator(0, stripeMaxBytes, maxRowCount, dictionaryMaxMemoryBytes, 0, directColumn, dictionaryColumn);
@@ -421,6 +425,107 @@ public void testWideDictionaryAndNarrowDirectBytesLimit()
}
}
+ @Test
+ public void testDictionaryUsefulCheck()
+ {
+ int largeBytesPerEntry = 768;
+ TestDictionaryColumn largeDirectColumn = directColumn(largeBytesPerEntry, 1);
+ int smallBytePerEntry = 256;
+ TestDictionaryColumn smallDirectColumn = directColumn(smallBytePerEntry, 1);
+
+ int dictionaryUsefulCheckPerChunkFrequency = 5;
+ int dictionaryUsefulCheckColumnSizeBytes = megabytes(10);
+
+ // Set Stripe Size high, so that dictionary should be abandoned much before.
+ int stripeMinBytes = megabytes(275);
+ int stripeMaxBytes = megabytes(300);
+
+ int dictionaryMaxMemoryBytes = megabytes(64);
+ int maxRowCount = 10_000_000;
+
+ // This test should abandon dictionary due to dictionary usefulness check. It has 2 columns first column is 768 bytes
+ // second column is 256 bytes, so each row is 1KB. When large column reaches 10 MB, the large column dictionary will
+ // be abandoned. Once the large column dictionary is abandoned, there is only small column with dictionary. Small column
+ // will reach 10 MB and will get abandoned as well. The constant (dictionaryUsefulCheckPerChunkFrequency + 1) * CHUNK_ROW_COUNT
+ // is added as the check is happening once every dictionaryUsefulCheckPerChunkFrequency.
+
+ int largeColumnAbandonRowCount = IntMath.divide(dictionaryUsefulCheckColumnSizeBytes, largeBytesPerEntry, CEILING);
+ int largeColumnAbandonUpperBound = largeColumnAbandonRowCount + (dictionaryUsefulCheckPerChunkFrequency + 1) * CHUNK_ROW_COUNT;
+ int smallColumnAbandonRowCount = IntMath.divide(dictionaryUsefulCheckColumnSizeBytes, smallBytePerEntry, CEILING);
+ int smallColumnAbandonUpperBound = smallColumnAbandonRowCount + (dictionaryUsefulCheckPerChunkFrequency + 1) * CHUNK_ROW_COUNT;
+
+ DataSimulator simulator = new DataSimulator(stripeMinBytes,
+ stripeMaxBytes,
+ maxRowCount,
+ dictionaryMaxMemoryBytes,
+ 0,
+ DICTIONARY_ALMOST_FULL_MEMORY_RANGE,
+ dictionaryUsefulCheckPerChunkFrequency,
+ dictionaryUsefulCheckColumnSizeBytes,
+ largeDirectColumn,
+ smallDirectColumn);
+
+ for (int loop = 0; loop < 3; loop++) {
+ assertFalse(simulator.isDictionaryMemoryFull());
+ assertFalse(largeDirectColumn.isDirectEncoded());
+ assertFalse(smallDirectColumn.isDirectEncoded());
+ assertEquals(simulator.getRowCount(), 0);
+ assertEquals(simulator.getBufferedBytes(), 0);
+
+ simulator.advanceToNextStateChange();
+
+ // the simulator should advance until the first large column switches to direct encoding.
+ assertFalse(simulator.isDictionaryMemoryFull());
+ assertTrue(largeDirectColumn.isDirectEncoded());
+ assertFalse(smallDirectColumn.isDirectEncoded());
+ assertGreaterThanOrEqual(simulator.getRowCount(), largeColumnAbandonRowCount);
+ assertLessThan(simulator.getRowCount(), largeColumnAbandonUpperBound);
+ assertLessThan(simulator.getRowCount(), smallColumnAbandonRowCount);
+
+ simulator.advanceToNextStateChange();
+
+ // the simulator should advance until the second column switches to direct encoding.
+ assertFalse(simulator.isDictionaryMemoryFull());
+ assertTrue(largeDirectColumn.isDirectEncoded());
+ assertTrue(smallDirectColumn.isDirectEncoded());
+ assertLessThan(simulator.getBufferedBytes(), (long) stripeMaxBytes);
+ assertGreaterThanOrEqual(simulator.getRowCount(), smallColumnAbandonRowCount);
+ assertLessThan(simulator.getRowCount(), smallColumnAbandonUpperBound);
+
+ simulator.finalOptimize();
+
+ assertFalse(simulator.isDictionaryMemoryFull());
+ simulator.reset();
+ }
+ }
+
+ @Test
+ public void testIsDictionaryUsefulCheckRequired()
+ {
+ TestDictionaryColumn directColumn = directColumn(1024, 1);
+ int dictionaryColumnSizeCheckBytes = megabytes(1);
+ int dictionaryUsefulCheckPerChunkFrequency = 3;
+ DataSimulator simulator = new DataSimulator(megabytes(100),
+ megabytes(200),
+ 10_000_000,
+ megabytes(100),
+ 0,
+ DICTIONARY_ALMOST_FULL_MEMORY_RANGE,
+ dictionaryUsefulCheckPerChunkFrequency,
+ dictionaryColumnSizeCheckBytes,
+ directColumn);
+
+ for (int loop = 0; loop < 3; loop++) {
+ assertFalse(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes + 1));
+ assertFalse(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes));
+ // Calling with 1 byte less should not increment the counter
+ assertFalse(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes - 1));
+
+ // 3rd time, it should return true as dictionaryUsefulCheckPerChunkFrequency is set to 3.
+ assertTrue(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes));
+ }
+ }
+
private static int megabytes(int size)
{
return toIntExact(new DataSize(size, Unit.MEGABYTE).toBytes());
@@ -445,20 +550,50 @@ public DataSimulator(
int dictionaryMemoryMaxBytes,
int otherColumnsBytesPerRow,
TestDictionaryColumn... dictionaryColumns)
+ {
+ this(stripeMinBytes,
+ stripeMaxBytes,
+ stripeMaxRowCount,
+ dictionaryMemoryMaxBytes,
+ otherColumnsBytesPerRow,
+ DICTIONARY_ALMOST_FULL_MEMORY_RANGE,
+ dictionaryMemoryMaxBytes,
+ 0,
+ dictionaryColumns);
+ }
+
+ public DataSimulator(
+ int stripeMinBytes,
+ int stripeMaxBytes,
+ int stripeMaxRowCount,
+ int dictionaryMemoryMaxBytes,
+ int otherColumnsBytesPerRow,
+ int dictionaryAlmostFullRangeBytes,
+ int dictionaryUsefulCheckPerChunkFrequency,
+ int dictionaryUsefulCheckColumnSizeBytes,
+ TestDictionaryColumn... dictionaryColumns)
{
this.stripeMaxBytes = stripeMaxBytes;
this.stripeMaxRowCount = stripeMaxRowCount;
this.otherColumnsBytesPerRow = otherColumnsBytesPerRow;
this.dictionaryColumns = ImmutableSet.copyOf(dictionaryColumns);
- this.optimizer = new DictionaryCompressionOptimizer(this.dictionaryColumns, stripeMinBytes, stripeMaxBytes, stripeMaxRowCount, dictionaryMemoryMaxBytes);
+ this.optimizer = new DictionaryCompressionOptimizer(
+ this.dictionaryColumns,
+ stripeMinBytes,
+ stripeMaxBytes,
+ stripeMaxRowCount,
+ dictionaryMemoryMaxBytes,
+ dictionaryAlmostFullRangeBytes,
+ dictionaryUsefulCheckColumnSizeBytes,
+ dictionaryUsefulCheckPerChunkFrequency);
}
public void advanceToNextStateChange()
{
List directColumnFlags = getDirectColumnFlags();
while (!optimizer.isFull(getBufferedBytes()) && getBufferedBytes() < stripeMaxBytes && getRowCount() < stripeMaxRowCount && directColumnFlags.equals(getDirectColumnFlags())) {
- rowCount += 1024;
+ rowCount += CHUNK_ROW_COUNT;
for (TestDictionaryColumn dictionaryColumn : dictionaryColumns) {
dictionaryColumn.advanceTo(rowCount);
}
@@ -504,6 +639,11 @@ public int getRowCount()
{
return rowCount;
}
+
+ public boolean isUsefulCheckRequired(int dictionaryMemoryBytes)
+ {
+ return optimizer.isUsefulCheckRequired(dictionaryMemoryBytes);
+ }
}
private static TestDictionaryColumn directColumn(int bytesPerEntry, double uniquePercentage)
diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java
index 5ce2c385d137..c2503b0361bf 100644
--- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java
+++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java
@@ -61,6 +61,10 @@ public void testProperties()
int stripeMaxRowCount = 1_100_000;
int rowGroupMaxRowCount = 15_000;
DataSize dictionaryMaxMemory = new DataSize(13_000, KILOBYTE);
+ DataSize dictionaryMemoryRange = new DataSize(1_000, KILOBYTE);
+ int dictionaryUsefulCheckPerChunkFrequency = 9_999;
+ DataSize dictionaryUsefulCheckIncrement = new DataSize(500, KILOBYTE);
+ DataSize dictionaryUsefulCheckColumnSize = new DataSize(1, MEGABYTE);
DataSize stringMaxStatisticsLimit = new DataSize(128, BYTE);
DataSize maxCompressionBufferSize = new DataSize(512, KILOBYTE);
OptionalInt compressionLevel = OptionalInt.of(5);
@@ -75,6 +79,9 @@ public void testProperties()
.withStripeMaxRowCount(stripeMaxRowCount)
.withRowGroupMaxRowCount(rowGroupMaxRowCount)
.withDictionaryMaxMemory(dictionaryMaxMemory)
+ .withDictionaryMemoryAlmostFullRange(dictionaryMemoryRange)
+ .withDictionaryUsefulCheckPerChunkFrequency(dictionaryUsefulCheckPerChunkFrequency)
+ .withDictionaryUsefulCheckColumnSize(dictionaryUsefulCheckColumnSize)
.withMaxStringStatisticsLimit(stringMaxStatisticsLimit)
.withMaxCompressionBufferSize(maxCompressionBufferSize)
.withCompressionLevel(compressionLevel)
@@ -90,6 +97,9 @@ public void testProperties()
assertEquals(stripeMaxRowCount, options.getStripeMaxRowCount());
assertEquals(rowGroupMaxRowCount, options.getRowGroupMaxRowCount());
assertEquals(dictionaryMaxMemory, options.getDictionaryMaxMemory());
+ assertEquals(dictionaryMemoryRange, options.getDictionaryMemoryAlmostFullRange());
+ assertEquals(dictionaryUsefulCheckPerChunkFrequency, options.getDictionaryUsefulCheckPerChunkFrequency());
+ assertEquals(dictionaryUsefulCheckColumnSize, options.getDictionaryUsefulCheckColumnSize());
assertEquals(stringMaxStatisticsLimit, options.getMaxStringStatisticsLimit());
assertEquals(maxCompressionBufferSize, options.getMaxCompressionBufferSize());
assertEquals(compressionLevel, options.getCompressionLevel());
@@ -108,6 +118,9 @@ public void testToString()
int stripeMaxRowCount = 1_100_000;
int rowGroupMaxRowCount = 15_000;
DataSize dictionaryMaxMemory = new DataSize(13_000, KILOBYTE);
+ DataSize dictionaryMemoryRange = new DataSize(1_000, KILOBYTE);
+ int dictionaryUsefulCheckPerChunkFrequency = 9_999;
+ DataSize dictionaryUsefulCheckColumnSize = new DataSize(1, MEGABYTE);
DataSize stringMaxStatisticsLimit = new DataSize(128, BYTE);
DataSize maxCompressionBufferSize = new DataSize(512, KILOBYTE);
DataSize dwrfStripeCacheMaxSize = new DataSize(4, MEGABYTE);
@@ -124,6 +137,9 @@ public void testToString()
.withStripeMaxRowCount(stripeMaxRowCount)
.withRowGroupMaxRowCount(rowGroupMaxRowCount)
.withDictionaryMaxMemory(dictionaryMaxMemory)
+ .withDictionaryMemoryAlmostFullRange(dictionaryMemoryRange)
+ .withDictionaryUsefulCheckPerChunkFrequency(dictionaryUsefulCheckPerChunkFrequency)
+ .withDictionaryUsefulCheckColumnSize(dictionaryUsefulCheckColumnSize)
.withMaxStringStatisticsLimit(stringMaxStatisticsLimit)
.withMaxCompressionBufferSize(maxCompressionBufferSize)
.withCompressionLevel(compressionLevel)
@@ -136,12 +152,12 @@ public void testToString()
.withPreserveDirectEncodingStripeCount(preserveDirectEncodingStripeCount)
.build();
- String expectedString = "OrcWriterOptions{stripeMinSize=13MB, stripeMaxSize=27MB, stripeMaxRowCount=1100000, "
- + "rowGroupMaxRowCount=15000, dictionaryMaxMemory=13000kB, maxStringStatisticsLimit=128B, "
- + "maxCompressionBufferSize=512kB, compressionLevel=OptionalInt[5], streamLayout=ByColumnSize{}, "
- + "integerDictionaryEncodingEnabled=false, stringDictionarySortingEnabled=true, "
- + "dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], "
- + "ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0}";
+ String expectedString = "OrcWriterOptions{stripeMinSize=13MB, stripeMaxSize=27MB, stripeMaxRowCount=1100000, rowGroupMaxRowCount=15000, " +
+ "dictionaryMaxMemory=13000kB, dictionaryMemoryAlmostFullRange=1000kB, dictionaryUsefulCheckPerChunkFrequency=9999, " +
+ "dictionaryUsefulCheckColumnSize=1MB, maxStringStatisticsLimit=128B, maxCompressionBufferSize=512kB, " +
+ "compressionLevel=OptionalInt[5], streamLayout=ByColumnSize{}, integerDictionaryEncodingEnabled=false, " +
+ "stringDictionarySortingEnabled=true, dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], " +
+ "ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0}";
assertEquals(expectedString, writerOptions.toString());
}
}