Skip to content

Commit 402e743

Browse files
committed
Iceberg: Table-level column stats filter support
1 parent f06aa95 commit 402e743

File tree

4 files changed

+97
-57
lines changed

4 files changed

+97
-57
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 50 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Arrays;
3030
import java.util.Collection;
3131
import java.util.Collections;
32-
import java.util.Iterator;
3332
import java.util.List;
3433
import java.util.ListIterator;
3534
import java.util.Map;
@@ -42,6 +41,7 @@
4241
import java.util.concurrent.Executors;
4342
import java.util.concurrent.atomic.AtomicInteger;
4443
import java.util.function.Function;
44+
import java.util.function.Predicate;
4545
import java.util.stream.Collectors;
4646
import java.util.stream.Stream;
4747
import org.apache.commons.collections.MapUtils;
@@ -186,7 +186,6 @@
186186
import org.apache.iceberg.puffin.BlobMetadata;
187187
import org.apache.iceberg.puffin.Puffin;
188188
import org.apache.iceberg.puffin.PuffinCompressionCodec;
189-
import org.apache.iceberg.puffin.PuffinReader;
190189
import org.apache.iceberg.puffin.PuffinWriter;
191190
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
192191
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -201,7 +200,6 @@
201200
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
202201
import org.apache.iceberg.types.Conversions;
203202
import org.apache.iceberg.types.Types;
204-
import org.apache.iceberg.util.ByteBuffers;
205203
import org.apache.iceberg.util.Pair;
206204
import org.apache.iceberg.util.SerializationUtil;
207205
import org.apache.iceberg.util.SnapshotUtil;
@@ -637,20 +635,27 @@ private boolean writeColStats(List<ColumnStatistics> colStats, Table tbl) {
637635
long snapshotId = tbl.currentSnapshot().snapshotId();
638636
long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
639637

640-
colStats.forEach(statsObj -> {
641-
byte[] serializeColStats = SerializationUtils.serialize(statsObj);
642-
puffinWriter.add(
643-
new Blob(
644-
ColumnStatisticsObj.class.getSimpleName(),
645-
ImmutableList.of(1),
646-
snapshotId,
647-
snapshotSequenceNumber,
648-
ByteBuffer.wrap(serializeColStats),
649-
PuffinCompressionCodec.NONE,
650-
ImmutableMap.of("partition",
651-
String.valueOf(statsObj.getStatsDesc().getPartName()))
652-
));
638+
colStats.forEach(stats -> {
639+
boolean isTblLevel = stats.getStatsDesc().isIsTblLevel();
640+
641+
for (Serializable statsObj : isTblLevel ? stats.getStatsObj() : Collections.singletonList(stats)) {
642+
byte[] serializeColStats = SerializationUtils.serialize(statsObj);
643+
puffinWriter.add(
644+
new Blob(
645+
ColumnStatisticsObj.class.getSimpleName(),
646+
ImmutableList.of(isTblLevel ? tbl.spec().schema().findField(
647+
((ColumnStatisticsObj) statsObj).getColName()).fieldId() : 1),
648+
snapshotId,
649+
snapshotSequenceNumber,
650+
ByteBuffer.wrap(serializeColStats),
651+
PuffinCompressionCodec.NONE,
652+
isTblLevel ?
653+
ImmutableMap.of("specId", String.valueOf(tbl.spec().specId())) :
654+
ImmutableMap.of("partition", String.valueOf(stats.getStatsDesc().getPartName()))
655+
));
656+
}
653657
});
658+
654659
puffinWriter.finish();
655660

656661
statisticsFile =
@@ -693,17 +698,28 @@ private boolean canProvideColStats(Table table, long snapshotId) {
693698
}
694699

695700
@Override
696-
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
701+
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
702+
List<String> colNames) {
697703
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
704+
698705
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
706+
if (snapshot == null) {
707+
return Lists.newArrayList();
708+
}
699709

700-
ColumnStatistics emptyStats = new ColumnStatistics();
701-
if (snapshot != null) {
702-
return IcebergTableUtil.getColStatsPath(table, snapshot.snapshotId())
703-
.map(statsPath -> readColStats(table, statsPath, null).get(0))
704-
.orElse(emptyStats).getStatsObj();
710+
Predicate<BlobMetadata> filter;
711+
if (colNames != null) {
712+
Set<String> columns = Sets.newHashSet(colNames);
713+
filter = metadata -> {
714+
int specId = Integer.parseInt(metadata.properties().get("specId"));
715+
String column = table.specs().get(specId).schema().findColumnName(metadata.inputFields().get(0));
716+
return columns.contains(column);
717+
};
718+
} else {
719+
filter = null;
705720
}
706-
return emptyStats.getStatsObj();
721+
722+
return IcebergTableUtil.readColStats(table, snapshot.snapshotId(), filter);
707723
}
708724

709725
@Override
@@ -720,9 +736,10 @@ public AggrStats getAggrColStatsFor(org.apache.hadoop.hive.ql.metadata.Table hms
720736
MetastoreConf.ConfVars.STATS_NDV_DENSITY_FUNCTION);
721737
double ndvTuner = MetastoreConf.getDoubleVar(getConf(), MetastoreConf.ConfVars.STATS_NDV_TUNER);
722738

723-
List<ColumnStatistics> partStats = IcebergTableUtil.getColStatsPath(table, snapshot.snapshotId())
724-
.map(statsPath -> readColStats(table, statsPath, Sets.newHashSet(partNames)))
725-
.orElse(Collections.emptyList());
739+
Set<String> partitions = Sets.newHashSet(partNames);
740+
Predicate<BlobMetadata> filter = metadata -> partitions.contains(metadata.properties().get("partition"));
741+
742+
List<ColumnStatistics> partStats = IcebergTableUtil.readColStats(table, snapshot.snapshotId(), filter);
726743

727744
partStats.forEach(colStats ->
728745
colStats.getStatsObj().removeIf(statsObj -> !colNames.contains(statsObj.getColName())));
@@ -736,30 +753,6 @@ public AggrStats getAggrColStatsFor(org.apache.hadoop.hive.ql.metadata.Table hms
736753
return new AggrStats(colStatsList, partStats.size());
737754
}
738755

739-
private List<ColumnStatistics> readColStats(Table table, Path statsPath, Set<String> partNames) {
740-
List<ColumnStatistics> colStats = Lists.newArrayList();
741-
742-
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
743-
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
744-
745-
if (partNames != null) {
746-
blobMetadata = blobMetadata.stream()
747-
.filter(metadata -> partNames.contains(metadata.properties().get("partition")))
748-
.collect(Collectors.toList());
749-
}
750-
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
751-
LOG.info("Using col stats from : {}", statsPath);
752-
753-
while (it.hasNext()) {
754-
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
755-
colStats.add(SerializationUtils.deserialize(byteBuffer));
756-
}
757-
} catch (Exception e) {
758-
LOG.warn(" Unable to read col stats: ", e);
759-
}
760-
return colStats;
761-
}
762-
763756
@Override
764757
public boolean canComputeQueryUsingStats(Partish partish) {
765758
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
@@ -799,22 +792,24 @@ private boolean shouldRewriteColStats(Table tbl) {
799792
private void checkAndMergeColStats(List<ColumnStatistics> statsNew, Table tbl) throws InvalidObjectException {
800793
Long previousSnapshotId = tbl.currentSnapshot().parentId();
801794
if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) {
802-
List<ColumnStatistics> statsOld = IcebergTableUtil.getColStatsPath(tbl, previousSnapshotId)
803-
.map(statsPath -> readColStats(tbl, statsPath, null))
804-
.orElse(Collections.emptyList());
805795

806796
boolean isTblLevel = statsNew.get(0).getStatsDesc().isIsTblLevel();
807797
Map<String, ColumnStatistics> oldStatsMap = Maps.newHashMap();
808798

799+
List<?> statsOld = IcebergTableUtil.readColStats(tbl, previousSnapshotId, null);
800+
809801
if (!isTblLevel) {
810-
for (ColumnStatistics statsObjOld : statsOld) {
802+
for (ColumnStatistics statsObjOld : (List<ColumnStatistics>) statsOld) {
811803
oldStatsMap.put(statsObjOld.getStatsDesc().getPartName(), statsObjOld);
812804
}
805+
} else {
806+
statsOld = Collections.singletonList(
807+
new ColumnStatistics(null, (List<ColumnStatisticsObj>) statsOld));
813808
}
814809
for (ColumnStatistics statsObjNew : statsNew) {
815810
String partitionKey = statsObjNew.getStatsDesc().getPartName();
816811
ColumnStatistics statsObjOld = isTblLevel ?
817-
statsOld.get(0) : oldStatsMap.get(partitionKey);
812+
(ColumnStatistics) statsOld.get(0) : oldStatsMap.get(partitionKey);
818813

819814
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) {
820815
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
package org.apache.iceberg.mr.hive;
2121

2222
import java.io.IOException;
23+
import java.nio.ByteBuffer;
2324
import java.time.ZoneId;
2425
import java.util.Collections;
2526
import java.util.Comparator;
27+
import java.util.Iterator;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Optional;
2931
import java.util.Properties;
3032
import java.util.function.BinaryOperator;
3133
import java.util.function.Function;
34+
import java.util.function.Predicate;
3235
import java.util.stream.Collectors;
36+
import org.apache.commons.lang3.SerializationUtils;
3337
import org.apache.commons.lang3.StringUtils;
3438
import org.apache.hadoop.conf.Configuration;
3539
import org.apache.hadoop.fs.Path;
@@ -77,11 +81,18 @@
7781
import org.apache.iceberg.io.CloseableIterable;
7882
import org.apache.iceberg.mr.Catalogs;
7983
import org.apache.iceberg.mr.InputFormatConfig;
84+
import org.apache.iceberg.puffin.BlobMetadata;
85+
import org.apache.iceberg.puffin.Puffin;
86+
import org.apache.iceberg.puffin.PuffinReader;
8087
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
88+
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
89+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
8190
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
8291
import org.apache.iceberg.types.Conversions;
8392
import org.apache.iceberg.types.Type;
8493
import org.apache.iceberg.types.Types;
94+
import org.apache.iceberg.util.ByteBuffers;
95+
import org.apache.iceberg.util.Pair;
8596
import org.apache.iceberg.util.PropertyUtil;
8697
import org.apache.iceberg.util.SnapshotUtil;
8798
import org.apache.iceberg.util.StructProjection;
@@ -547,4 +558,31 @@ public static TransformSpec getTransformSpec(Table table, String transformName,
547558
return spec;
548559
}
549560

561+
public static <T> List<T> readColStats(Table table, Long snapshotId, Predicate<BlobMetadata> filter) {
562+
List<T> colStats = Lists.newArrayList();
563+
564+
Optional<Path> statsPath = IcebergTableUtil.getColStatsPath(table, snapshotId);
565+
if (!statsPath.isPresent()) {
566+
return colStats;
567+
}
568+
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
569+
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
570+
571+
if (filter != null) {
572+
blobMetadata = blobMetadata.stream().filter(filter)
573+
.collect(Collectors.toList());
574+
}
575+
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
576+
LOG.info("Using col stats from : {}", statsPath);
577+
578+
while (it.hasNext()) {
579+
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
580+
colStats.add(SerializationUtils.deserialize(byteBuffer));
581+
}
582+
} catch (Exception e) {
583+
LOG.warn(" Unable to read col stats: ", e);
584+
}
585+
return colStats;
586+
}
587+
550588
}

ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6121,7 +6121,7 @@ public List<ColumnStatisticsObj> getTableColumnStatistics(
61216121
List<ColumnStatisticsObj> retv = null;
61226122
try {
61236123
if (tbl.isNonNative() && tbl.getStorageHandler().canProvideColStatistics(tbl)) {
6124-
return tbl.getStorageHandler().getColStatistics(tbl);
6124+
return tbl.getStorageHandler().getColStatistics(tbl, colNames);
61256125
}
61266126
if (checkTransactional) {
61276127
AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl);

ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,19 @@ default boolean canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata
286286
/**
287287
* Returns column statistics (upper/lower bounds, number of Null/NaN values, NDVs, histogram).
288288
* @param table table object
289+
* @param colNames list of column names
289290
* @return list of ColumnStatisticsObj objects
290291
*/
291-
default List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table table) {
292+
default List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table table,
293+
List<String> colNames) {
292294
return null;
293295
}
294296

297+
@Deprecated
298+
default List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table table) {
299+
return getColStatistics(table, null);
300+
}
301+
295302
/**
296303
* Returns an aggregated column statistics for the supplied partition list
297304
* @param table table object

0 commit comments

Comments
 (0)