From 699832f664f02a9d1afb661d68d55dfb2a5ce3db Mon Sep 17 00:00:00 2001 From: Arunachalam Thirupathi Date: Mon, 1 Nov 2021 13:12:45 -0700 Subject: [PATCH] Support Large Dictionary for OrcWriter OrcWriter uses dictionary encoding for all columns until the writer's total dictionary memory exceeds the dictionaryMaxMemory - 4MB. Then starts abandoning the dictionary encodings. When running with large dictionary sizes (say 80 MB), and using long dictionary, the dictionary writer could retain 100's of MB before it will be abandoned. This change introduces new configuration parameters to control this behavior. 1. Make the 4 MB threshold when dictionary is almost full configurable. Large dictionary can configure this to something bigger. 2. When a dictionary column exceeds a certain dictionary size, measure if dictionary is effective and abandon it if it is not. 3. The setting 2 could affect existing writers, so introduce a 3rd setting on how often to do the dictionary effectiveness check. It is configured to INT_MAX to preserve existing behavior. --- .../orc/DictionaryCompressionOptimizer.java | 150 ++++++++++++----- .../com/facebook/presto/orc/OrcWriter.java | 10 +- .../facebook/presto/orc/OrcWriterOptions.java | 57 +++++++ .../orc/TestDictionaryColumnWriter.java | 8 +- .../TestDictionaryCompressionOptimizer.java | 152 +++++++++++++++++- .../presto/orc/TestOrcWriterOptions.java | 28 +++- 6 files changed, 348 insertions(+), 57 deletions(-) diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java b/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java index dab1e3df2e98..34f1e6172cbe 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/DictionaryCompressionOptimizer.java @@ -16,7 +16,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Longs; import io.airlift.units.DataSize; -import io.airlift.units.DataSize.Unit; import java.util.ArrayList; import java.util.Iterator; @@ -28,18 +27,48 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; +/** + * DictionaryCompressionOptimizer has 2 objectives: + * 1) Bound the dictionary memory of the reader, when all columns are read. Reader's dictionary memory + * should not exceed the dictionaryMemoryMaxBytesHigh. + * 2) When dictionary encoding for a column produces size comparable to the direct encoding, choose + * direct encoding over dictionary encoding. Dictionary encoding/decoding is memory and CPU intensive, + * so for comparable column sizes, direct encoding is mostly better. + *

+ * Note: Dictionary writer might use more memory as they over-allocate dictionary sizes as the writers + * build dictionary as they see new data. The hash tables implementation in the dictionary writer's allocate + * hash buckets in power of 2. So after a million entries, the overallocation consumes large amount of memory. + *

+ * DictionaryCompressionOptimizer functionality can be controlled by the following configs to the constructor. + *

+ * 1. dictionaryMemoryMaxBytes -> Max size of the dictionary when all columns are read. Note: Writer + * might consume more memory due to the over-allocation. + *

+ * 2. dictionaryMemoryAlmostFullRangeBytes -> When the dictionary size exceeds dictionaryMaxMemoryBytes + * dictionary columns will be converted to direct to reduce the dictionary size. By setting a range + * the stripe can be flushed, before the dictionary is full. When dictionary size is higher than + * (dictionaryMemoryMaxBytes - dictionaryMemoryAlmostFullRangeBytes), it is considered almost full + * and is ready for flushing. This setting is defined as a delta on dictionaryMemoryMaxBytes for backward compatibility. + *

+ * 3. dictionaryUsefulCheckColumnSizeBytes -> Columns start with dictionary encoding and when the dictionary memory + * is almost full, usefulness of the dictionary is measured. For large dictionaries (> 40 MB) the check + * might happen very late and large dictionary might cause writer to OOM due to writer over allocating for + * dictionary growth. When a dictionary for a column grows beyond the dictionaryUsefulCheckColumnSizeBytes the + * dictionary usefulness check will be performed and if dictionary is not useful, it will be converted to direct. + *

+ * 4. dictionaryUsefulCheckPerChunkFrequency -> dictionaryUsefulCheck could be costly if performed on every chunk. + * The dictionaryUsefulCheck will be performed when a column dictionary is above the dictionaryUsefulCheckColumnSizeBytes + * and per every dictionaryUsefulCheckPerChunkFrequency chunks written. + */ public class DictionaryCompressionOptimizer { private static final double DICTIONARY_MIN_COMPRESSION_RATIO = 1.25; - // Instead of waiting for the dictionary to fill completely, which would force a column into - // direct mode, close the stripe early assuming it has hit the minimum row count. - static final DataSize DICTIONARY_MEMORY_MAX_RANGE = new DataSize(4, Unit.MEGABYTE); - - static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize(4, Unit.MEGABYTE); + static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize(4, MEGABYTE); private final List allWriters; private final List directConversionCandidates = new ArrayList<>(); @@ -49,15 +78,21 @@ public class DictionaryCompressionOptimizer private final int stripeMaxRowCount; private final int dictionaryMemoryMaxBytesLow; private final int dictionaryMemoryMaxBytesHigh; + private final int dictionaryUsefulCheckColumnSizeBytes; + private final int dictionaryUsefulCheckPerChunkFrequency; private int dictionaryMemoryBytes; + private int dictionaryUsefulCheckCounter; public DictionaryCompressionOptimizer( Set writers, int stripeMinBytes, int stripeMaxBytes, int stripeMaxRowCount, - int dictionaryMemoryMaxBytes) + int dictionaryMemoryMaxBytes, + int dictionaryMemoryAlmostFullRangeBytes, + int dictionaryUsefulCheckColumnSizeBytes, + int dictionaryUsefulCheckPerChunkFrequency) { requireNonNull(writers, "writers is null"); this.allWriters = writers.stream() @@ -74,9 +109,14 @@ public DictionaryCompressionOptimizer( this.stripeMaxRowCount = stripeMaxRowCount; checkArgument(dictionaryMemoryMaxBytes >= 0, "dictionaryMemoryMaxBytes is negative"); + checkArgument(dictionaryMemoryAlmostFullRangeBytes >= 0, "dictionaryMemoryRangeBytes is negative"); this.dictionaryMemoryMaxBytesHigh = dictionaryMemoryMaxBytes; - this.dictionaryMemoryMaxBytesLow = (int) Math.max(dictionaryMemoryMaxBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes(), 0); + this.dictionaryMemoryMaxBytesLow = Math.max(dictionaryMemoryMaxBytes - dictionaryMemoryAlmostFullRangeBytes, 0); + checkArgument(dictionaryUsefulCheckPerChunkFrequency >= 0, "dictionaryUsefulCheckPerChunkFrequency is negative"); + this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency; + + this.dictionaryUsefulCheckColumnSizeBytes = dictionaryUsefulCheckColumnSizeBytes; directConversionCandidates.addAll(allWriters); } @@ -87,12 +127,12 @@ public int getDictionaryMemoryBytes() public boolean isFull(long bufferedBytes) { - // if the strip is big enough to flush, stop before we hit the absolute max, so we are + // if the stripe is big enough to flush, stop before we hit the absolute max, so we are // not forced to convert a dictionary to direct to fit in memory if (bufferedBytes > stripeMinBytes) { return dictionaryMemoryBytes > dictionaryMemoryMaxBytesLow; } - // strip is small, grow to the high water mark (so at the very least we have more information) + // stripe is small, grow to the high watermark (so at the very least we have more information) return dictionaryMemoryBytes > dictionaryMemoryMaxBytesHigh; } @@ -107,30 +147,43 @@ public void reset() public void finalOptimize(int bufferedBytes) { updateDirectConversionCandidates(); - convertLowCompressionStreams(bufferedBytes); + convertLowCompressionStreams(true, bufferedBytes); } - public void optimize(int bufferedBytes, int stripeRowCount) + @VisibleForTesting + boolean isUsefulCheckRequired(int dictionaryMemoryBytes) { - // recompute the dictionary memory usage - dictionaryMemoryBytes = allWriters.stream() - .filter(writer -> !writer.isDirectEncoded()) - .mapToInt(DictionaryColumnManager::getDictionaryBytes) - .sum(); + if (dictionaryMemoryBytes < dictionaryUsefulCheckColumnSizeBytes) { + return false; + } - // update the dictionary growth history - allWriters.stream() - .filter(writer -> !writer.isDirectEncoded()) - .forEach(column -> column.updateHistory(stripeRowCount)); + dictionaryUsefulCheckCounter++; + if (dictionaryUsefulCheckCounter == dictionaryUsefulCheckPerChunkFrequency) { + dictionaryUsefulCheckCounter = 0; + return true; + } - if (dictionaryMemoryBytes <= dictionaryMemoryMaxBytesLow) { - return; + return false; + } + + public void optimize(int bufferedBytes, int stripeRowCount) + { + // recompute the dictionary memory usage + int totalDictionaryBytes = 0; + for (DictionaryColumnManager writer : allWriters) { + if (!writer.isDirectEncoded()) { + totalDictionaryBytes += writer.getDictionaryBytes(); + writer.updateHistory(stripeRowCount); + } } + dictionaryMemoryBytes = totalDictionaryBytes; - updateDirectConversionCandidates(); + boolean isDictionaryAlmostFull = dictionaryMemoryBytes > dictionaryMemoryMaxBytesLow; - // before any further checks, convert all low compression streams - bufferedBytes = convertLowCompressionStreams(bufferedBytes); + if (isDictionaryAlmostFull || isUsefulCheckRequired(dictionaryMemoryBytes)) { + updateDirectConversionCandidates(); + bufferedBytes = convertLowCompressionStreams(isDictionaryAlmostFull, bufferedBytes); + } if (dictionaryMemoryBytes <= dictionaryMemoryMaxBytesLow || bufferedBytes >= stripeMaxBytes) { return; @@ -161,7 +214,7 @@ private void optimizeDictionaryColumns(int stripeRowCount, BufferedBytesCounter return; } - // if the stripe is larger then the minimum stripe size, we are not required to convert any more dictionary columns to direct + // if the stripe is larger than the minimum stripe size, we are not required to convert any more dictionary columns to direct if (bufferedBytesCounter.getBufferedBytes() >= stripeMinBytes) { // check if we can get better compression by converting a dictionary column to direct. This can happen when then there are multiple // dictionary columns and one does not compress well, so if we convert it to direct we can continue to use the existing dictionaries @@ -196,20 +249,22 @@ private boolean convertDictionaryColumn(BufferedBytesCounter bufferedBytesCounte } @VisibleForTesting - int convertLowCompressionStreams(int bufferedBytes) + int convertLowCompressionStreams(boolean tryAllStreams, int bufferedBytes) { // convert all low compression column to direct Iterator iterator = directConversionCandidates.iterator(); while (iterator.hasNext()) { DictionaryColumnManager dictionaryWriter = iterator.next(); - if (dictionaryWriter.getCompressionRatio() < DICTIONARY_MIN_COMPRESSION_RATIO) { - int columnBufferedBytes = toIntExact(dictionaryWriter.getBufferedBytes()); - OptionalInt directBytes = tryConvertToDirect(dictionaryWriter, getMaxDirectBytes(bufferedBytes)); - iterator.remove(); - if (directBytes.isPresent()) { - bufferedBytes = bufferedBytes + directBytes.getAsInt() - columnBufferedBytes; - if (bufferedBytes >= stripeMaxBytes) { - return bufferedBytes; + if (tryAllStreams || dictionaryWriter.getDictionaryBytes() >= dictionaryUsefulCheckColumnSizeBytes) { + if (dictionaryWriter.getCompressionRatio() < DICTIONARY_MIN_COMPRESSION_RATIO) { + int columnBufferedBytes = toIntExact(dictionaryWriter.getBufferedBytes()); + OptionalInt directBytes = tryConvertToDirect(dictionaryWriter, getMaxDirectBytes(bufferedBytes)); + iterator.remove(); + if (directBytes.isPresent()) { + bufferedBytes = bufferedBytes + directBytes.getAsInt() - columnBufferedBytes; + if (bufferedBytes >= stripeMaxBytes) { + return bufferedBytes; + } } } } @@ -217,6 +272,12 @@ int convertLowCompressionStreams(int bufferedBytes) return bufferedBytes; } + @VisibleForTesting + List getDirectConversionCandidates() + { + return directConversionCandidates; + } + private void updateDirectConversionCandidates() { // Writers can switch to Direct encoding internally. Remove them from direct conversion candidates. @@ -255,14 +316,14 @@ private double currentCompressionRatio(int totalNonDictionaryBytes) } /** - * Choose a dictionary column to convert to direct encoding. We do this by predicting the compression ration + * Choose a dictionary column to convert to direct encoding. We do this by predicting the compression ratio * of the stripe if a singe column is flipped to direct. So for each column, we try to predict the row count * when we will hit a stripe flush limit if that column were converted to direct. Once we know the row count, we * calculate the predicted compression ratio. * * @param totalNonDictionaryBytes current size of the stripe without non-dictionary columns * @param stripeRowCount current number of rows in the stripe - * @return the column that would produce the best stripe compression ration if converted to direct + * @return the column that would produce the best stripe compression ratio if converted to direct */ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int totalNonDictionaryBytes, int stripeRowCount) { @@ -305,7 +366,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota long currentIndexBytes = totalDictionaryIndexBytes - column.getIndexBytes(); long currentTotalBytes = currentRawBytes + currentDictionaryBytes + currentIndexBytes; - // estimate the size of each new row if we were convert this column to direct + // estimate the size of each new row if we were to convert this column to direct double rawBytesPerFutureRow = totalNonDictionaryBytesPerRow + column.getRawBytesPerRow(); double dictionaryBytesPerFutureRow = totalDictionaryBytesPerNewRow - column.getDictionaryBytesPerFutureRow(); double indexBytesPerFutureRow = totalDictionaryIndexBytesPerRow - column.getIndexBytesPerRow(); @@ -317,7 +378,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota long rowsToStripeRowLimit = stripeMaxRowCount - stripeRowCount; long rowsToLimit = Longs.min(rowsToDictionaryMemoryLimit, rowsToStripeMemoryLimit, rowsToStripeRowLimit); - // predict the compression ratio at that limit if we were convert this column to direct + // predict the compression ratio at that limit if we were to convert this column to direct long predictedUncompressedSizeAtLimit = totalNonDictionaryBytes + totalDictionaryRawBytes + (totalUncompressedBytesPerRow * rowsToLimit); long predictedCompressedSizeAtLimit = (long) (currentTotalBytes + (totalBytesPerFutureRow * rowsToLimit)); double predictedCompressionRatioAtLimit = 1.0 * predictedUncompressedSizeAtLimit / predictedCompressedSizeAtLimit; @@ -371,7 +432,8 @@ public interface DictionaryColumn boolean isDirectEncoded(); } - private static class DictionaryColumnManager + @VisibleForTesting + static class DictionaryColumnManager { private final DictionaryColumn dictionaryColumn; @@ -481,6 +543,12 @@ public boolean isDirectEncoded() { return dictionaryColumn.isDirectEncoded(); } + + @VisibleForTesting + public DictionaryColumn getDictionaryColumn() + { + return dictionaryColumn; + } } private static class DictionaryCompressionProjection diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java index 6b49d976dc22..e4a9773834b2 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java @@ -317,14 +317,18 @@ public OrcWriter( } } this.columnWriters = columnWriters.build(); - this.dictionaryMaxMemoryBytes = toIntExact( - requireNonNull(options.getDictionaryMaxMemory(), "dictionaryMaxMemory is null").toBytes()); + this.dictionaryMaxMemoryBytes = toIntExact(options.getDictionaryMaxMemory().toBytes()); + int dictionaryMemoryAlmostFullRangeBytes = toIntExact(options.getDictionaryMemoryAlmostFullRange().toBytes()); + int dictionaryUsefulCheckColumnSizeBytes = toIntExact(options.getDictionaryUsefulCheckColumnSize().toBytes()); this.dictionaryCompressionOptimizer = new DictionaryCompressionOptimizer( dictionaryColumnWriters.build(), stripeMinBytes, stripeMaxBytes, stripeMaxRowCount, - dictionaryMaxMemoryBytes); + dictionaryMaxMemoryBytes, + dictionaryMemoryAlmostFullRangeBytes, + dictionaryUsefulCheckColumnSizeBytes, + options.getDictionaryUsefulCheckPerChunkFrequency()); for (Entry entry : this.userMetadata.entrySet()) { recordValidation(validation -> validation.addMetadataProperty(entry.getKey(), utf8Slice(entry.getValue()))); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java index 11a5ec1559fa..8d17fe225929 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java @@ -35,6 +35,9 @@ public class OrcWriterOptions public static final int DEFAULT_STRIPE_MAX_ROW_COUNT = 10_000_000; public static final int DEFAULT_ROW_GROUP_MAX_ROW_COUNT = 10_000; public static final DataSize DEFAULT_DICTIONARY_MAX_MEMORY = new DataSize(16, MEGABYTE); + public static final DataSize DEFAULT_DICTIONARY_MEMORY_ALMOST_FULL_RANGE = new DataSize(4, MEGABYTE); + public static final int DEFAULT_DICTIONARY_USEFUL_CHECK_PER_CHUNK_FREQUENCY = Integer.MAX_VALUE; + public static final DataSize DEFAULT_DICTIONARY_USEFUL_CHECK_COLUMN_SIZE = new DataSize(6, MEGABYTE); public static final DataSize DEFAULT_MAX_STRING_STATISTICS_LIMIT = new DataSize(64, BYTE); public static final DataSize DEFAULT_MAX_COMPRESSION_BUFFER_SIZE = new DataSize(256, KILOBYTE); public static final DataSize DEFAULT_DWRF_STRIPE_CACHE_MAX_SIZE = new DataSize(8, MEGABYTE); @@ -46,6 +49,9 @@ public class OrcWriterOptions private final int stripeMaxRowCount; private final int rowGroupMaxRowCount; private final DataSize dictionaryMaxMemory; + private final DataSize dictionaryMemoryAlmostFullRange; + private final int dictionaryUsefulCheckPerChunkFrequency; + private final DataSize dictionaryUsefulCheckColumnSize; private final DataSize maxStringStatisticsLimit; private final DataSize maxCompressionBufferSize; private final OptionalInt compressionLevel; @@ -66,6 +72,9 @@ private OrcWriterOptions( int stripeMaxRowCount, int rowGroupMaxRowCount, DataSize dictionaryMaxMemory, + DataSize dictionaryMemoryAlmostFullRange, + int dictionaryUsefulCheckPerChunkFrequency, + DataSize dictionaryUsefulCheckColumnSize, DataSize maxStringStatisticsLimit, DataSize maxCompressionBufferSize, OptionalInt compressionLevel, @@ -81,6 +90,8 @@ private OrcWriterOptions( checkArgument(stripeMaxRowCount >= 1, "stripeMaxRowCount must be at least 1"); checkArgument(rowGroupMaxRowCount >= 1, "rowGroupMaxRowCount must be at least 1"); requireNonNull(dictionaryMaxMemory, "dictionaryMaxMemory is null"); + requireNonNull(dictionaryMemoryAlmostFullRange, "dictionaryMemoryAlmostFullRange is null"); + requireNonNull(dictionaryUsefulCheckColumnSize, "dictionaryUsefulCheckColumnSize is null"); requireNonNull(maxStringStatisticsLimit, "maxStringStatisticsLimit is null"); requireNonNull(maxCompressionBufferSize, "maxCompressionBufferSize is null"); requireNonNull(compressionLevel, "compressionLevel is null"); @@ -92,6 +103,9 @@ private OrcWriterOptions( this.stripeMaxRowCount = stripeMaxRowCount; this.rowGroupMaxRowCount = rowGroupMaxRowCount; this.dictionaryMaxMemory = dictionaryMaxMemory; + this.dictionaryMemoryAlmostFullRange = dictionaryMemoryAlmostFullRange; + this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency; + this.dictionaryUsefulCheckColumnSize = dictionaryUsefulCheckColumnSize; this.maxStringStatisticsLimit = maxStringStatisticsLimit; this.maxCompressionBufferSize = maxCompressionBufferSize; this.compressionLevel = compressionLevel; @@ -128,6 +142,21 @@ public DataSize getDictionaryMaxMemory() return dictionaryMaxMemory; } + public DataSize getDictionaryMemoryAlmostFullRange() + { + return dictionaryMemoryAlmostFullRange; + } + + public int getDictionaryUsefulCheckPerChunkFrequency() + { + return dictionaryUsefulCheckPerChunkFrequency; + } + + public DataSize getDictionaryUsefulCheckColumnSize() + { + return dictionaryUsefulCheckColumnSize; + } + public DataSize getMaxStringStatisticsLimit() { return maxStringStatisticsLimit; @@ -182,6 +211,9 @@ public String toString() .add("stripeMaxRowCount", stripeMaxRowCount) .add("rowGroupMaxRowCount", rowGroupMaxRowCount) .add("dictionaryMaxMemory", dictionaryMaxMemory) + .add("dictionaryMemoryAlmostFullRange", dictionaryMemoryAlmostFullRange) + .add("dictionaryUsefulCheckPerChunkFrequency", dictionaryUsefulCheckPerChunkFrequency) + .add("dictionaryUsefulCheckColumnSize", dictionaryUsefulCheckColumnSize) .add("maxStringStatisticsLimit", maxStringStatisticsLimit) .add("maxCompressionBufferSize", maxCompressionBufferSize) .add("compressionLevel", compressionLevel) @@ -206,6 +238,9 @@ public static class Builder private int stripeMaxRowCount = DEFAULT_STRIPE_MAX_ROW_COUNT; private int rowGroupMaxRowCount = DEFAULT_ROW_GROUP_MAX_ROW_COUNT; private DataSize dictionaryMaxMemory = DEFAULT_DICTIONARY_MAX_MEMORY; + private DataSize dictionaryMemoryAlmostFullRange = DEFAULT_DICTIONARY_MEMORY_ALMOST_FULL_RANGE; + private int dictionaryUsefulCheckPerChunkFrequency = DEFAULT_DICTIONARY_USEFUL_CHECK_PER_CHUNK_FREQUENCY; + private DataSize dictionaryUsefulCheckColumnSize = DEFAULT_DICTIONARY_USEFUL_CHECK_COLUMN_SIZE; private DataSize maxStringStatisticsLimit = DEFAULT_MAX_STRING_STATISTICS_LIMIT; private DataSize maxCompressionBufferSize = DEFAULT_MAX_COMPRESSION_BUFFER_SIZE; private OptionalInt compressionLevel = OptionalInt.empty(); @@ -250,6 +285,25 @@ public Builder withDictionaryMaxMemory(DataSize dictionaryMaxMemory) return this; } + public Builder withDictionaryMemoryAlmostFullRange(DataSize dictionaryMemoryAlmostFullRange) + { + this.dictionaryMemoryAlmostFullRange = requireNonNull(dictionaryMemoryAlmostFullRange, "dictionaryMemoryAlmostFullRange is null"); + return this; + } + + public Builder withDictionaryUsefulCheckPerChunkFrequency(int dictionaryUsefulCheckPerChunkFrequency) + { + checkArgument(dictionaryUsefulCheckPerChunkFrequency >= 0, "dictionaryUsefulCheckPerChunkFrequency is negative"); + this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency; + return this; + } + + public Builder withDictionaryUsefulCheckColumnSize(DataSize dictionaryUsefulCheckColumnSize) + { + this.dictionaryUsefulCheckColumnSize = requireNonNull(dictionaryUsefulCheckColumnSize, "dictionaryUsefulCheckColumnSize is null"); + return this; + } + public Builder withMaxStringStatisticsLimit(DataSize maxStringStatisticsLimit) { this.maxStringStatisticsLimit = requireNonNull(maxStringStatisticsLimit, "maxStringStatisticsLimit is null"); @@ -332,6 +386,9 @@ public OrcWriterOptions build() stripeMaxRowCount, rowGroupMaxRowCount, dictionaryMaxMemory, + dictionaryMemoryAlmostFullRange, + dictionaryUsefulCheckPerChunkFrequency, + dictionaryUsefulCheckColumnSize, maxStringStatisticsLimit, maxCompressionBufferSize, compressionLevel, diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java index a82167897bd4..bd0fb7bd97ea 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java @@ -18,6 +18,7 @@ import com.facebook.presto.common.block.BlockBuilder; import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.orc.DictionaryCompressionOptimizer.DictionaryColumnManager; import com.facebook.presto.orc.metadata.ColumnEncoding; import com.facebook.presto.orc.metadata.StripeFooter; import com.facebook.presto.orc.writer.DictionaryColumnWriter; @@ -892,8 +893,13 @@ void validate(int batchId, OrcWriter writer) else { // Successful conversion to direct, changes the state of the dictionary compression // optimizer and it should go only via dictionary compression optimizer. - writer.getDictionaryCompressionOptimizer().convertLowCompressionStreams(bufferedBytes); + List directConversionCandidates = writer.getDictionaryCompressionOptimizer().getDirectConversionCandidates(); + boolean contains = directConversionCandidates.stream().anyMatch(x -> x.getDictionaryColumn() == columnWriter); + assertTrue(contains); + writer.getDictionaryCompressionOptimizer().convertLowCompressionStreams(true, bufferedBytes); assertTrue(columnWriter.isDirectEncoded(), "BatchId " + batchId + " bytes " + bufferedBytes); + contains = directConversionCandidates.stream().anyMatch(x -> x.getDictionaryColumn() == columnWriter); + assertFalse(contains); } index++; } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java index 49c8a2f3046a..ddd5e7e7890a 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryCompressionOptimizer.java @@ -15,6 +15,7 @@ import com.facebook.presto.orc.DictionaryCompressionOptimizer.DictionaryColumn; import com.google.common.collect.ImmutableSet; +import com.google.common.math.IntMath; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; import org.testng.annotations.Test; @@ -26,11 +27,11 @@ import static com.facebook.airlift.testing.Assertions.assertGreaterThanOrEqual; import static com.facebook.airlift.testing.Assertions.assertLessThan; -import static com.facebook.presto.orc.DictionaryCompressionOptimizer.DICTIONARY_MEMORY_MAX_RANGE; import static com.facebook.presto.orc.DictionaryCompressionOptimizer.estimateIndexBytesPerValue; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; import static java.lang.Math.toIntExact; +import static java.math.RoundingMode.CEILING; import static java.util.Objects.requireNonNull; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -38,6 +39,9 @@ public class TestDictionaryCompressionOptimizer { + private static final int DICTIONARY_ALMOST_FULL_MEMORY_RANGE = megabytes(4); + private static final int CHUNK_ROW_COUNT = 1024; + @Test public void testNoDictionariesBytesLimit() { @@ -152,7 +156,7 @@ public void testSingleDictionaryColumnMemoryLimit() // construct a simulator that will hit the dictionary (low) memory limit by estimating the number of rows at the memory limit, and then setting large limits around this value int stripeMaxBytes = megabytes(100); - int dictionaryMaxMemoryBytesLow = dictionaryMaxMemoryBytes - (int) DICTIONARY_MEMORY_MAX_RANGE.toBytes(); + int dictionaryMaxMemoryBytesLow = dictionaryMaxMemoryBytes - DICTIONARY_ALMOST_FULL_MEMORY_RANGE; int expectedMaxRowCount = (int) (dictionaryMaxMemoryBytesLow / bytesPerEntry / uniquePercentage); DataSimulator simulator = new DataSimulator(0, stripeMaxBytes, expectedMaxRowCount * 2, dictionaryMaxMemoryBytes, 0, column); @@ -276,7 +280,7 @@ public void testSingleDirectBytesLimit() // construct a simulator that will flip the column to direct and then hit the bytes limit int stripeMaxBytes = megabytes(100); - int expectedRowCountAtFlip = (int) ((dictionaryMaxMemoryBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes()) / bytesPerEntry); + int expectedRowCountAtFlip = ((dictionaryMaxMemoryBytes - DICTIONARY_ALMOST_FULL_MEMORY_RANGE) / bytesPerEntry); int expectedMaxRowCountAtFull = stripeMaxBytes / bytesPerEntry; DataSimulator simulator = new DataSimulator(0, stripeMaxBytes, expectedMaxRowCountAtFull, dictionaryMaxMemoryBytes, 0, column); @@ -379,7 +383,7 @@ public void testWideDictionaryAndNarrowDirectBytesLimit() // construct a simulator that will flip the column to direct and then hit the bytes limit int stripeMaxBytes = megabytes(2000); - int dictionaryMaxMemoryBytesLow = (int) (dictionaryMaxMemoryBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes()); + int dictionaryMaxMemoryBytesLow = dictionaryMaxMemoryBytes - DICTIONARY_ALMOST_FULL_MEMORY_RANGE; int expectedRowCountAtFlip = (int) ((dictionaryMaxMemoryBytesLow - (dictionaryEntries * dictionaryBytesPerEntry)) / (directBytesPerEntry * directUniquePercentage)); int maxRowCount = 10_000_000; DataSimulator simulator = new DataSimulator(0, stripeMaxBytes, maxRowCount, dictionaryMaxMemoryBytes, 0, directColumn, dictionaryColumn); @@ -421,6 +425,107 @@ public void testWideDictionaryAndNarrowDirectBytesLimit() } } + @Test + public void testDictionaryUsefulCheck() + { + int largeBytesPerEntry = 768; + TestDictionaryColumn largeDirectColumn = directColumn(largeBytesPerEntry, 1); + int smallBytePerEntry = 256; + TestDictionaryColumn smallDirectColumn = directColumn(smallBytePerEntry, 1); + + int dictionaryUsefulCheckPerChunkFrequency = 5; + int dictionaryUsefulCheckColumnSizeBytes = megabytes(10); + + // Set Stripe Size high, so that dictionary should be abandoned much before. + int stripeMinBytes = megabytes(275); + int stripeMaxBytes = megabytes(300); + + int dictionaryMaxMemoryBytes = megabytes(64); + int maxRowCount = 10_000_000; + + // This test should abandon dictionary due to dictionary usefulness check. It has 2 columns first column is 768 bytes + // second column is 256 bytes, so each row is 1KB. When large column reaches 10 MB, the large column dictionary will + // be abandoned. Once the large column dictionary is abandoned, there is only small column with dictionary. Small column + // will reach 10 MB and will get abandoned as well. The constant (dictionaryUsefulCheckPerChunkFrequency + 1) * CHUNK_ROW_COUNT + // is added as the check is happening once every dictionaryUsefulCheckPerChunkFrequency. + + int largeColumnAbandonRowCount = IntMath.divide(dictionaryUsefulCheckColumnSizeBytes, largeBytesPerEntry, CEILING); + int largeColumnAbandonUpperBound = largeColumnAbandonRowCount + (dictionaryUsefulCheckPerChunkFrequency + 1) * CHUNK_ROW_COUNT; + int smallColumnAbandonRowCount = IntMath.divide(dictionaryUsefulCheckColumnSizeBytes, smallBytePerEntry, CEILING); + int smallColumnAbandonUpperBound = smallColumnAbandonRowCount + (dictionaryUsefulCheckPerChunkFrequency + 1) * CHUNK_ROW_COUNT; + + DataSimulator simulator = new DataSimulator(stripeMinBytes, + stripeMaxBytes, + maxRowCount, + dictionaryMaxMemoryBytes, + 0, + DICTIONARY_ALMOST_FULL_MEMORY_RANGE, + dictionaryUsefulCheckPerChunkFrequency, + dictionaryUsefulCheckColumnSizeBytes, + largeDirectColumn, + smallDirectColumn); + + for (int loop = 0; loop < 3; loop++) { + assertFalse(simulator.isDictionaryMemoryFull()); + assertFalse(largeDirectColumn.isDirectEncoded()); + assertFalse(smallDirectColumn.isDirectEncoded()); + assertEquals(simulator.getRowCount(), 0); + assertEquals(simulator.getBufferedBytes(), 0); + + simulator.advanceToNextStateChange(); + + // the simulator should advance until the first large column switches to direct encoding. + assertFalse(simulator.isDictionaryMemoryFull()); + assertTrue(largeDirectColumn.isDirectEncoded()); + assertFalse(smallDirectColumn.isDirectEncoded()); + assertGreaterThanOrEqual(simulator.getRowCount(), largeColumnAbandonRowCount); + assertLessThan(simulator.getRowCount(), largeColumnAbandonUpperBound); + assertLessThan(simulator.getRowCount(), smallColumnAbandonRowCount); + + simulator.advanceToNextStateChange(); + + // the simulator should advance until the second column switches to direct encoding. + assertFalse(simulator.isDictionaryMemoryFull()); + assertTrue(largeDirectColumn.isDirectEncoded()); + assertTrue(smallDirectColumn.isDirectEncoded()); + assertLessThan(simulator.getBufferedBytes(), (long) stripeMaxBytes); + assertGreaterThanOrEqual(simulator.getRowCount(), smallColumnAbandonRowCount); + assertLessThan(simulator.getRowCount(), smallColumnAbandonUpperBound); + + simulator.finalOptimize(); + + assertFalse(simulator.isDictionaryMemoryFull()); + simulator.reset(); + } + } + + @Test + public void testIsDictionaryUsefulCheckRequired() + { + TestDictionaryColumn directColumn = directColumn(1024, 1); + int dictionaryColumnSizeCheckBytes = megabytes(1); + int dictionaryUsefulCheckPerChunkFrequency = 3; + DataSimulator simulator = new DataSimulator(megabytes(100), + megabytes(200), + 10_000_000, + megabytes(100), + 0, + DICTIONARY_ALMOST_FULL_MEMORY_RANGE, + dictionaryUsefulCheckPerChunkFrequency, + dictionaryColumnSizeCheckBytes, + directColumn); + + for (int loop = 0; loop < 3; loop++) { + assertFalse(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes + 1)); + assertFalse(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes)); + // Calling with 1 byte less should not increment the counter + assertFalse(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes - 1)); + + // 3rd time, it should return true as dictionaryUsefulCheckPerChunkFrequency is set to 3. + assertTrue(simulator.isUsefulCheckRequired(dictionaryColumnSizeCheckBytes)); + } + } + private static int megabytes(int size) { return toIntExact(new DataSize(size, Unit.MEGABYTE).toBytes()); @@ -445,20 +550,50 @@ public DataSimulator( int dictionaryMemoryMaxBytes, int otherColumnsBytesPerRow, TestDictionaryColumn... dictionaryColumns) + { + this(stripeMinBytes, + stripeMaxBytes, + stripeMaxRowCount, + dictionaryMemoryMaxBytes, + otherColumnsBytesPerRow, + DICTIONARY_ALMOST_FULL_MEMORY_RANGE, + dictionaryMemoryMaxBytes, + 0, + dictionaryColumns); + } + + public DataSimulator( + int stripeMinBytes, + int stripeMaxBytes, + int stripeMaxRowCount, + int dictionaryMemoryMaxBytes, + int otherColumnsBytesPerRow, + int dictionaryAlmostFullRangeBytes, + int dictionaryUsefulCheckPerChunkFrequency, + int dictionaryUsefulCheckColumnSizeBytes, + TestDictionaryColumn... dictionaryColumns) { this.stripeMaxBytes = stripeMaxBytes; this.stripeMaxRowCount = stripeMaxRowCount; this.otherColumnsBytesPerRow = otherColumnsBytesPerRow; this.dictionaryColumns = ImmutableSet.copyOf(dictionaryColumns); - this.optimizer = new DictionaryCompressionOptimizer(this.dictionaryColumns, stripeMinBytes, stripeMaxBytes, stripeMaxRowCount, dictionaryMemoryMaxBytes); + this.optimizer = new DictionaryCompressionOptimizer( + this.dictionaryColumns, + stripeMinBytes, + stripeMaxBytes, + stripeMaxRowCount, + dictionaryMemoryMaxBytes, + dictionaryAlmostFullRangeBytes, + dictionaryUsefulCheckColumnSizeBytes, + dictionaryUsefulCheckPerChunkFrequency); } public void advanceToNextStateChange() { List directColumnFlags = getDirectColumnFlags(); while (!optimizer.isFull(getBufferedBytes()) && getBufferedBytes() < stripeMaxBytes && getRowCount() < stripeMaxRowCount && directColumnFlags.equals(getDirectColumnFlags())) { - rowCount += 1024; + rowCount += CHUNK_ROW_COUNT; for (TestDictionaryColumn dictionaryColumn : dictionaryColumns) { dictionaryColumn.advanceTo(rowCount); } @@ -504,6 +639,11 @@ public int getRowCount() { return rowCount; } + + public boolean isUsefulCheckRequired(int dictionaryMemoryBytes) + { + return optimizer.isUsefulCheckRequired(dictionaryMemoryBytes); + } } private static TestDictionaryColumn directColumn(int bytesPerEntry, double uniquePercentage) diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java index 5ce2c385d137..c2503b0361bf 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java @@ -61,6 +61,10 @@ public void testProperties() int stripeMaxRowCount = 1_100_000; int rowGroupMaxRowCount = 15_000; DataSize dictionaryMaxMemory = new DataSize(13_000, KILOBYTE); + DataSize dictionaryMemoryRange = new DataSize(1_000, KILOBYTE); + int dictionaryUsefulCheckPerChunkFrequency = 9_999; + DataSize dictionaryUsefulCheckIncrement = new DataSize(500, KILOBYTE); + DataSize dictionaryUsefulCheckColumnSize = new DataSize(1, MEGABYTE); DataSize stringMaxStatisticsLimit = new DataSize(128, BYTE); DataSize maxCompressionBufferSize = new DataSize(512, KILOBYTE); OptionalInt compressionLevel = OptionalInt.of(5); @@ -75,6 +79,9 @@ public void testProperties() .withStripeMaxRowCount(stripeMaxRowCount) .withRowGroupMaxRowCount(rowGroupMaxRowCount) .withDictionaryMaxMemory(dictionaryMaxMemory) + .withDictionaryMemoryAlmostFullRange(dictionaryMemoryRange) + .withDictionaryUsefulCheckPerChunkFrequency(dictionaryUsefulCheckPerChunkFrequency) + .withDictionaryUsefulCheckColumnSize(dictionaryUsefulCheckColumnSize) .withMaxStringStatisticsLimit(stringMaxStatisticsLimit) .withMaxCompressionBufferSize(maxCompressionBufferSize) .withCompressionLevel(compressionLevel) @@ -90,6 +97,9 @@ public void testProperties() assertEquals(stripeMaxRowCount, options.getStripeMaxRowCount()); assertEquals(rowGroupMaxRowCount, options.getRowGroupMaxRowCount()); assertEquals(dictionaryMaxMemory, options.getDictionaryMaxMemory()); + assertEquals(dictionaryMemoryRange, options.getDictionaryMemoryAlmostFullRange()); + assertEquals(dictionaryUsefulCheckPerChunkFrequency, options.getDictionaryUsefulCheckPerChunkFrequency()); + assertEquals(dictionaryUsefulCheckColumnSize, options.getDictionaryUsefulCheckColumnSize()); assertEquals(stringMaxStatisticsLimit, options.getMaxStringStatisticsLimit()); assertEquals(maxCompressionBufferSize, options.getMaxCompressionBufferSize()); assertEquals(compressionLevel, options.getCompressionLevel()); @@ -108,6 +118,9 @@ public void testToString() int stripeMaxRowCount = 1_100_000; int rowGroupMaxRowCount = 15_000; DataSize dictionaryMaxMemory = new DataSize(13_000, KILOBYTE); + DataSize dictionaryMemoryRange = new DataSize(1_000, KILOBYTE); + int dictionaryUsefulCheckPerChunkFrequency = 9_999; + DataSize dictionaryUsefulCheckColumnSize = new DataSize(1, MEGABYTE); DataSize stringMaxStatisticsLimit = new DataSize(128, BYTE); DataSize maxCompressionBufferSize = new DataSize(512, KILOBYTE); DataSize dwrfStripeCacheMaxSize = new DataSize(4, MEGABYTE); @@ -124,6 +137,9 @@ public void testToString() .withStripeMaxRowCount(stripeMaxRowCount) .withRowGroupMaxRowCount(rowGroupMaxRowCount) .withDictionaryMaxMemory(dictionaryMaxMemory) + .withDictionaryMemoryAlmostFullRange(dictionaryMemoryRange) + .withDictionaryUsefulCheckPerChunkFrequency(dictionaryUsefulCheckPerChunkFrequency) + .withDictionaryUsefulCheckColumnSize(dictionaryUsefulCheckColumnSize) .withMaxStringStatisticsLimit(stringMaxStatisticsLimit) .withMaxCompressionBufferSize(maxCompressionBufferSize) .withCompressionLevel(compressionLevel) @@ -136,12 +152,12 @@ public void testToString() .withPreserveDirectEncodingStripeCount(preserveDirectEncodingStripeCount) .build(); - String expectedString = "OrcWriterOptions{stripeMinSize=13MB, stripeMaxSize=27MB, stripeMaxRowCount=1100000, " - + "rowGroupMaxRowCount=15000, dictionaryMaxMemory=13000kB, maxStringStatisticsLimit=128B, " - + "maxCompressionBufferSize=512kB, compressionLevel=OptionalInt[5], streamLayout=ByColumnSize{}, " - + "integerDictionaryEncodingEnabled=false, stringDictionarySortingEnabled=true, " - + "dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], " - + "ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0}"; + String expectedString = "OrcWriterOptions{stripeMinSize=13MB, stripeMaxSize=27MB, stripeMaxRowCount=1100000, rowGroupMaxRowCount=15000, " + + "dictionaryMaxMemory=13000kB, dictionaryMemoryAlmostFullRange=1000kB, dictionaryUsefulCheckPerChunkFrequency=9999, " + + "dictionaryUsefulCheckColumnSize=1MB, maxStringStatisticsLimit=128B, maxCompressionBufferSize=512kB, " + + "compressionLevel=OptionalInt[5], streamLayout=ByColumnSize{}, integerDictionaryEncodingEnabled=false, " + + "stringDictionarySortingEnabled=true, dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], " + + "ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0}"; assertEquals(expectedString, writerOptions.toString()); } }