Skip to content

Commit 46a98ba

Browse files
authored
HIVE-29354: Iceberg: [V3] Projection and Filter Pushdown for Shredded VARIANT Columns (#6224)
1 parent 33e11ec commit 46a98ba

29 files changed

+4328
-572
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
import org.apache.iceberg.expressions.Expressions;
4242
import org.apache.iceberg.expressions.Literal;
4343
import org.apache.iceberg.expressions.UnboundTerm;
44+
import org.apache.iceberg.mr.hive.variant.VariantPathUtil;
4445
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4546
import org.apache.iceberg.transforms.Transforms;
47+
import org.apache.iceberg.types.Type;
4648
import org.apache.iceberg.types.Types;
4749
import org.apache.iceberg.util.DateTimeUtil;
4850
import org.apache.iceberg.util.NaNUtil;
@@ -110,7 +112,9 @@ private static Expression translate(ExpressionTree tree, List<PredicateLeaf> lea
110112
*/
111113
private static Expression translateLeaf(PredicateLeaf leaf) {
112114
TransformSpec transformSpec = TransformSpec.fromStringWithColumnName(leaf.getColumnName());
113-
UnboundTerm<Object> column = SchemaUtils.toTerm(transformSpec);
115+
String columnName = transformSpec.getColumnName();
116+
117+
UnboundTerm<Object> column = toTerm(columnName, leaf, transformSpec);
114118

115119
switch (leaf.getOperator()) {
116120
case EQUALS:
@@ -141,6 +145,69 @@ private static Expression translateLeaf(PredicateLeaf leaf) {
141145
}
142146
}
143147

148+
private static UnboundTerm<Object> toTerm(String columnName, PredicateLeaf leaf, TransformSpec transformSpec) {
149+
UnboundTerm<Object> column = tryVariantExtractTerm(columnName, leaf);
150+
if (column != null) {
151+
return column;
152+
}
153+
return SchemaUtils.toTerm(transformSpec);
154+
}
155+
156+
/**
157+
* Converts a shredded variant pseudo-column (e.g. {@code data.typed_value.age}) into an Iceberg variant extract term
158+
* (e.g. {@code extract(data, "$.age", "long")}).
159+
*
160+
* <p>This enables Iceberg to prune manifests/files using variant metrics produced when variant shredding is enabled.
161+
*/
162+
private static UnboundTerm<Object> tryVariantExtractTerm(String columnName, PredicateLeaf leaf) {
163+
int typedIdx = columnName.indexOf(VariantPathUtil.TYPED_VALUE_SEGMENT);
164+
if (typedIdx < 0) {
165+
return null;
166+
}
167+
168+
String variantColumn = columnName.substring(0, typedIdx);
169+
String extractedPath =
170+
columnName.substring(typedIdx + VariantPathUtil.TYPED_VALUE_SEGMENT.length());
171+
if (variantColumn.isEmpty() || extractedPath.isEmpty()) {
172+
return null;
173+
}
174+
175+
Type.PrimitiveType icebergType = extractPrimitiveType(leaf);
176+
if (icebergType == null) {
177+
return null;
178+
}
179+
180+
// Build an RFC9535 shorthand JSONPath-like path: "$.field" or "$.a.b"
181+
String jsonPath = "$." + extractedPath;
182+
try {
183+
return Expressions.extract(variantColumn, jsonPath, icebergType.toString());
184+
} catch (RuntimeException e) {
185+
// Invalid path/type; fall back to normal reference handling.
186+
return null;
187+
}
188+
}
189+
190+
private static Type.PrimitiveType extractPrimitiveType(PredicateLeaf leaf) {
191+
// Returned types must serialize (via toString) into Iceberg primitive type strings accepted by
192+
// Types.fromPrimitiveString.
193+
return switch (leaf.getType()) {
194+
case LONG -> Types.LongType.get();
195+
case FLOAT ->
196+
// Hive SARG uses FLOAT for both float/double; using double is the safest default.
197+
Types.DoubleType.get();
198+
case STRING -> Types.StringType.get();
199+
case BOOLEAN -> Types.BooleanType.get();
200+
case DATE -> Types.DateType.get();
201+
case TIMESTAMP ->
202+
// Iceberg timestamps are represented as micros in a long, but the Iceberg type is timestamp.
203+
Types.TimestampType.withoutZone();
204+
case DECIMAL ->
205+
// Precision/scale are not available in the SARG leaf type.
206+
null;
207+
default -> null;
208+
};
209+
}
210+
144211
// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
145212
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
146213
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java

Lines changed: 98 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.iceberg.expressions.ResidualEvaluator;
5555
import org.apache.iceberg.hive.HiveVersion;
5656
import org.apache.iceberg.mr.InputFormatConfig;
57+
import org.apache.iceberg.mr.hive.variant.VariantFilterRewriter;
5758
import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
5859
import org.apache.iceberg.mr.mapred.Container;
5960
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
@@ -91,65 +92,129 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
9192
}
9293

9394
/**
94-
* Converts the Hive filter found in the job conf to an Iceberg filter expression.
95-
* @param conf - job conf
96-
* @return - Iceberg data filter expression
95+
* Encapsulates planning-time and reader-time Iceberg filter expressions derived from Hive predicates.
9796
*/
98-
static Expression icebergDataFilterFromHiveConf(Configuration conf) {
99-
Expression icebergFilter = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
100-
if (icebergFilter != null) {
101-
// in case we already have it prepared..
102-
return icebergFilter;
97+
private static final class FilterExpressions {
98+
99+
private static Expression planningFilter(Configuration conf) {
100+
// Planning-safe filter (extract removed) may already be serialized for reuse.
101+
Expression planningFilter = SerializationUtil
102+
.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
103+
if (planningFilter != null) {
104+
// in case we already have it prepared..
105+
return planningFilter;
106+
}
107+
// Reader filter should retain extract(...) for row-group pruning. Rebuild from Hive predicate to avoid losing
108+
// variant rewrites when planningFilter was stripped.
109+
Expression readerFilter = icebergDataFilterFromHiveConf(conf);
110+
if (readerFilter != null) {
111+
return VariantFilterRewriter.stripVariantExtractPredicates(readerFilter);
112+
}
113+
return null;
103114
}
104-
String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
105-
if (hiveFilter != null) {
106-
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
107-
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
108-
return getFilterExpr(conf, exprNodeDesc);
115+
116+
private static Expression icebergDataFilterFromHiveConf(Configuration conf) {
117+
// Build an Iceberg filter from Hive's serialized predicate so we can preserve extract(...) terms for
118+
// reader-level pruning (e.g. Parquet shredded VARIANT row-group pruning).
119+
//
120+
// This intentionally does NOT consult FILTER_EXPRESSION, because FILTER_EXPRESSION must remain safe for
121+
// Iceberg planning-time utilities (some of which cannot stringify extract(...) terms).
122+
String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
123+
if (hiveFilter != null) {
124+
ExprNodeGenericFuncDesc exprNodeDesc =
125+
SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
126+
return getFilterExpr(conf, exprNodeDesc);
127+
}
128+
return null;
129+
}
130+
131+
private static Expression planningResidual(FileScanTask task, Configuration conf) {
132+
return residual(task, conf, planningFilter(conf));
133+
}
134+
135+
private static Expression readerResidual(FileScanTask task, Configuration conf) {
136+
return residual(task, conf, icebergDataFilterFromHiveConf(conf));
137+
}
138+
139+
private static Expression residual(FileScanTask task, Configuration conf, Expression filter) {
140+
if (filter == null) {
141+
return Expressions.alwaysTrue();
142+
}
143+
boolean caseSensitive = conf.getBoolean(
144+
InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
145+
146+
return ResidualEvaluator.of(task.spec(), filter, caseSensitive)
147+
.residualFor(task.file().partition());
109148
}
110-
return null;
111149
}
112150

113151
/**
114-
* getFilterExpr extracts search argument from ExprNodeGenericFuncDesc and returns Iceberg Filter Expression
152+
* Builds an Iceberg filter expression from a Hive predicate expression node.
115153
* @param conf - job conf
116154
* @param exprNodeDesc - Describes a GenericFunc node
117155
* @return Iceberg Filter Expression
118156
*/
119157
static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
120-
if (exprNodeDesc != null) {
121-
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
122-
try {
123-
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
124-
} catch (UnsupportedOperationException e) {
125-
LOG.warn("Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ", e);
158+
if (exprNodeDesc == null) {
159+
return null;
160+
}
161+
162+
ExprNodeGenericFuncDesc exprForSarg = exprNodeDesc;
163+
if (Boolean.parseBoolean(conf.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED))) {
164+
ExprNodeGenericFuncDesc rewritten = VariantFilterRewriter.rewriteForShredding(exprNodeDesc);
165+
if (rewritten != null) {
166+
exprForSarg = rewritten;
126167
}
127168
}
128-
return null;
169+
170+
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprForSarg);
171+
if (sarg == null) {
172+
return null;
173+
}
174+
175+
try {
176+
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
177+
} catch (UnsupportedOperationException e) {
178+
LOG.warn(
179+
"Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ",
180+
e);
181+
return null;
182+
}
129183
}
130184

131185
/**
132-
* Converts Hive filter found in the passed job conf to an Iceberg filter expression. Then evaluates this
133-
* against the task's partition value producing a residual filter expression.
186+
* Returns a residual expression that is safe to apply as a record-level filter.
187+
*
188+
* <p>This residual is derived from the task-level Iceberg planning filter (already extract-free) after
189+
* evaluating it against the task's partition value.
134190
* @param task - file scan task to evaluate the expression against
135191
* @param conf - job conf
136192
* @return - Iceberg residual filter expression
137193
*/
138194
public static Expression residualForTask(FileScanTask task, Configuration conf) {
139-
Expression dataFilter = icebergDataFilterFromHiveConf(conf);
140-
if (dataFilter == null) {
141-
return Expressions.alwaysTrue();
142-
}
143-
return ResidualEvaluator.of(
144-
task.spec(), dataFilter,
145-
conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT)
146-
).residualFor(task.file().partition());
195+
return FilterExpressions.planningResidual(task, conf);
196+
}
197+
198+
/**
199+
* Returns a residual expression intended only for reader-level pruning (best-effort).
200+
*
201+
* <p>This residual is derived from the task-level Iceberg filter after evaluating it against the task's
202+
* partition value. It may include {@code extract(...)} predicates and is suitable for formats/readers that
203+
* can leverage such terms for pruning (e.g. Parquet row-group pruning using shredded VARIANT columns).
204+
*
205+
* <p><strong>Do not</strong> use this for record-level residual filtering, as {@code extract} cannot be
206+
* evaluated at record level in Iceberg readers.
207+
*/
208+
public static Expression residualForReaderPruning(FileScanTask task, Configuration conf) {
209+
return FilterExpressions.readerResidual(task, conf);
147210
}
148211

149212
@Override
150213
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
151-
Expression filter = icebergDataFilterFromHiveConf(job);
214+
Expression filter = FilterExpressions.planningFilter(job);
152215
if (filter != null) {
216+
// Iceberg planning-time utilities may attempt to stringify the filter. Ensure the planning filter never
217+
// contains extract(...) or shredded typed_value references.
153218
job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
154219
}
155220

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@
187187
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
188188
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
189189
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
190-
import org.apache.iceberg.parquet.VariantUtil;
191190
import org.apache.iceberg.puffin.Blob;
192191
import org.apache.iceberg.puffin.BlobMetadata;
193192
import org.apache.iceberg.puffin.Puffin;
@@ -1749,8 +1748,7 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps)
17491748
if (FileFormat.AVRO == IcebergTableUtil.defaultFileFormat(tableProps::getProperty) ||
17501749
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) ||
17511750
hasOrcTimeInSchema(tableProps, tableSchema) ||
1752-
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema) ||
1753-
VariantUtil.shouldUseVariantShredding(tableProps::getProperty, tableSchema)) {
1751+
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
17541752
// disable vectorization
17551753
SessionStateUtil.getQueryState(conf).ifPresent(queryState ->
17561754
queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));

0 commit comments

Comments
 (0)