Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.mr.hive.variant.VariantPathUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.NaNUtil;
Expand Down Expand Up @@ -112,8 +114,8 @@ private static Expression translate(ExpressionTree tree, List<PredicateLeaf> lea
private static Expression translateLeaf(PredicateLeaf leaf) {
TransformSpec transformSpec = TransformSpec.fromStringWithColumnName(leaf.getColumnName());
String columnName = transformSpec.getColumnName();
UnboundTerm<Object> column =
ObjectUtils.defaultIfNull(toTerm(columnName, transformSpec), Expressions.ref(columnName));

UnboundTerm<Object> column = resolveTerm(columnName, leaf, transformSpec);

switch (leaf.getOperator()) {
case EQUALS:
Expand Down Expand Up @@ -144,6 +146,15 @@ private static Expression translateLeaf(PredicateLeaf leaf) {
}
}

private static UnboundTerm<Object> resolveTerm(String columnName, PredicateLeaf leaf, TransformSpec transformSpec) {
UnboundTerm<Object> column = tryVariantExtractTerm(columnName, leaf);
if (column != null) {
return column;
}
return ObjectUtils.defaultIfNull(
toTerm(columnName, transformSpec), Expressions.ref(columnName));
}

public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transformSpec) {
if (transformSpec == null) {
return null;
Expand All @@ -168,6 +179,66 @@ public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transf
}
}

/**
* Converts a shredded variant pseudo-column (e.g. {@code data.typed_value.age}) into an Iceberg variant extract term
* (e.g. {@code extract(data, "$.age", "long")}).
*
* <p>This enables Iceberg to prune manifests/files using variant metrics produced when variant shredding is enabled.
*/
private static UnboundTerm<Object> tryVariantExtractTerm(String columnName, PredicateLeaf leaf) {
int typedIdx = columnName.indexOf(VariantPathUtil.TYPED_VALUE_SEGMENT);
if (typedIdx < 0) {
return null;
}

String variantColumn = columnName.substring(0, typedIdx);
String extractedPath =
columnName.substring(typedIdx + VariantPathUtil.TYPED_VALUE_SEGMENT.length());
if (variantColumn.isEmpty() || extractedPath.isEmpty()) {
return null;
}

Type.PrimitiveType icebergType = extractPrimitiveType(leaf);
if (icebergType == null) {
return null;
}

// Build an RFC9535 shorthand JSONPath-like path: "$.field" or "$.a.b"
String jsonPath = "$." + extractedPath;
try {
return Expressions.extract(variantColumn, jsonPath, icebergType.toString());
} catch (RuntimeException e) {
// Invalid path/type; fall back to normal reference handling.
return null;
}
}

private static Type.PrimitiveType extractPrimitiveType(PredicateLeaf leaf) {
// Returned types must serialize (via toString) into Iceberg primitive type strings accepted by
// Types.fromPrimitiveString.
switch (leaf.getType()) {
case LONG:
return Types.LongType.get();
case FLOAT:
// Hive SARG uses FLOAT for both float/double; using double is the safest default.
return Types.DoubleType.get();
case STRING:
return Types.StringType.get();
case BOOLEAN:
return Types.BooleanType.get();
case DATE:
return Types.DateType.get();
case TIMESTAMP:
// Iceberg timestamps are represented as micros in a long, but the Iceberg type is timestamp.
return Types.TimestampType.withoutZone();
case DECIMAL:
// Precision/scale are not available in the SARG leaf type.
return null;
default:
return null;
}
}

// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.variant.VariantFilterRewriter;
import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
Expand Down Expand Up @@ -91,65 +92,129 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
}

/**
* Converts the Hive filter found in the job conf to an Iceberg filter expression.
* @param conf - job conf
* @return - Iceberg data filter expression
* Encapsulates planning-time and reader-time Iceberg filter expressions derived from Hive predicates.
*/
static Expression icebergDataFilterFromHiveConf(Configuration conf) {
Expression icebergFilter = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
if (icebergFilter != null) {
// in case we already have it prepared..
return icebergFilter;
private static final class FilterExpressions {

private static Expression planningFilter(Configuration conf) {
// Planning-safe filter (extract removed) may already be serialized for reuse.
Expression planningFilter = SerializationUtil
.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
if (planningFilter != null) {
// in case we already have it prepared..
return planningFilter;
}
// Reader filter should retain extract(...) for row-group pruning. Rebuild from Hive predicate to avoid losing
// variant rewrites when planningFilter was stripped.
Expression readerFilter = icebergDataFilterFromHiveConf(conf);
if (readerFilter != null) {
return VariantFilterRewriter.stripVariantExtractPredicates(readerFilter);
}
return null;
}
String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
return getFilterExpr(conf, exprNodeDesc);

private static Expression icebergDataFilterFromHiveConf(Configuration conf) {
// Build an Iceberg filter from Hive's serialized predicate so we can preserve extract(...) terms for
// reader-level pruning (e.g. Parquet shredded VARIANT row-group pruning).
//
// This intentionally does NOT consult FILTER_EXPRESSION, because FILTER_EXPRESSION must remain safe for
// Iceberg planning-time utilities (some of which cannot stringify extract(...) terms).
String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc =
SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
return getFilterExpr(conf, exprNodeDesc);
}
return null;
}

private static Expression planningResidual(FileScanTask task, Configuration conf) {
return residual(task, conf, planningFilter(conf));
}

private static Expression readerResidual(FileScanTask task, Configuration conf) {
return residual(task, conf, icebergDataFilterFromHiveConf(conf));
}

private static Expression residual(FileScanTask task, Configuration conf, Expression filter) {
if (filter == null) {
return Expressions.alwaysTrue();
}
boolean caseSensitive = conf.getBoolean(
InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);

return ResidualEvaluator.of(task.spec(), filter, caseSensitive)
.residualFor(task.file().partition());
}
return null;
}

/**
* getFilterExpr extracts search argument from ExprNodeGenericFuncDesc and returns Iceberg Filter Expression
* Builds an Iceberg filter expression from a Hive predicate expression node.
* @param conf - job conf
* @param exprNodeDesc - Describes a GenericFunc node
* @return Iceberg Filter Expression
*/
static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
if (exprNodeDesc != null) {
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
try {
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ", e);
if (exprNodeDesc == null) {
return null;
}

ExprNodeGenericFuncDesc exprForSarg = exprNodeDesc;
if (Boolean.parseBoolean(conf.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED))) {
ExprNodeGenericFuncDesc rewritten = VariantFilterRewriter.rewriteForShredding(exprNodeDesc);
if (rewritten != null) {
exprForSarg = rewritten;
}
}
return null;

SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprForSarg);
if (sarg == null) {
return null;
}

try {
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
} catch (UnsupportedOperationException e) {
LOG.warn(
"Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ",
e);
return null;
}
}

/**
* Converts Hive filter found in the passed job conf to an Iceberg filter expression. Then evaluates this
* against the task's partition value producing a residual filter expression.
* Returns a residual expression that is safe to apply as a record-level filter.
*
* <p>This residual is derived from the task-level Iceberg planning filter (already extract-free) after
* evaluating it against the task's partition value.
* @param task - file scan task to evaluate the expression against
* @param conf - job conf
* @return - Iceberg residual filter expression
*/
public static Expression residualForTask(FileScanTask task, Configuration conf) {
Expression dataFilter = icebergDataFilterFromHiveConf(conf);
if (dataFilter == null) {
return Expressions.alwaysTrue();
}
return ResidualEvaluator.of(
task.spec(), dataFilter,
conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT)
).residualFor(task.file().partition());
return FilterExpressions.planningResidual(task, conf);
}

/**
* Returns a residual expression intended only for reader-level pruning (best-effort).
*
* <p>This residual is derived from the task-level Iceberg filter after evaluating it against the task's
* partition value. It may include {@code extract(...)} predicates and is suitable for formats/readers that
* can leverage such terms for pruning (e.g. Parquet row-group pruning using shredded VARIANT columns).
*
* <p><strong>Do not</strong> use this for record-level residual filtering, as {@code extract} cannot be
* evaluated at record level in Iceberg readers.
*/
public static Expression residualForReaderPruning(FileScanTask task, Configuration conf) {
return FilterExpressions.readerResidual(task, conf);
}

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Expression filter = icebergDataFilterFromHiveConf(job);
Expression filter = FilterExpressions.planningFilter(job);
if (filter != null) {
// Iceberg planning-time utilities may attempt to stringify the filter. Ensure the planning filter never
// contains extract(...) or shredded typed_value references.
job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
import org.apache.iceberg.parquet.VariantUtil;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
Expand Down Expand Up @@ -1752,8 +1751,7 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps)
if (FileFormat.AVRO == IcebergTableUtil.defaultFileFormat(tableProps::getProperty) ||
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) ||
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema) ||
VariantUtil.shouldUseVariantShredding(tableProps::getProperty, tableSchema)) {
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
// disable vectorization
SessionStateUtil.getQueryState(conf).ifPresent(queryState ->
queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));
Expand Down
Loading
Loading