Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -3261,6 +3261,7 @@ children | The child operations for the op. This allows us to represent a predic
name | Specifies the name of a column. This is only applicable to column ops.
value | Specifies the value of a literal. This is only applicable to literal ops.
valueType | Specifies the value type of a column or a literal op. This is only applicate to column and literal ops.
exprCtx | Optional field that provides additional expression context.

The supported Ops:

Expand Down Expand Up @@ -3291,6 +3292,14 @@ ValueType | Description
"double" | Represents a Double type.
"timestamp" | Represents a timestamp in [Timestamp Format](#timestamp-format).

The exprCtx structure:

When present, the `exprCtx` field is a JSON object that may contain the following fields:

Field Name | Description
-|-
collationIdentifier | Optional string that specifies the collation to use for string comparisons. The format is `provider.collationName.version`. The provider is either `icu` for ICU-based collations (e.g., UNICODE_CI) or `spark` for Spark-specific collations (e.g., UTF8_LCASE). The version specifies the version of collation used (e.g. ICU version "75.1"). Example: "icu.UNICODE_CI.75.1" or "spark.UTF8_LCASE.75.1". When collationIdentifier field is absent, the default UTF8_BINARY collation is used. For more information about collations in Delta, see the [collations protocol RFC](https://github.com/delta-io/delta/issues/2894).


Examples

Expand Down Expand Up @@ -3346,6 +3355,21 @@ Examples
}
```

4. Collated string comparison.

```json
{
"op": "equal",
"children": [
{"op": "column", "name": "name", "valueType": "string"},
{"op": "literal", "value": "TestValue", "valueType": "string"}
],
"exprCtx": {
"collationIdentifier": "icu.UNICODE_CI.75.1"
}
}
```

## Delta Sharing Streaming Specs
Delta Sharing Streaming is supported starting from delta-sharing-spark 0.6.0. As it's implemented
based on spark structured streaming, it leverages a pull model to consume the new data of the shared
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.filters

import org.apache.spark.sql.catalyst.expressions.{Expression => SqlExpression}
import org.apache.spark.sql.types.{StringType => SqlStringType}

object CollationExtractor {
// Extracts collation identifier from two expressions if both are strings
// with the same collation.
def extractCollationIdentifier(
left: SqlExpression,
right: SqlExpression): Option[String] = {
// The 2.12 client depends on Spark 3.5, which does not support collations.
// This means we cannot extract the collation identifier. In this case, we
// return None to default to UTF8 binary comparisons, as collations are just a
// writer feature in Delta and Spark 3.5 does not support them but should still
// be able to read the table.
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.filters

import com.ibm.icu.util.VersionInfo.ICU_VERSION
import org.apache.spark.sql.catalyst.expressions.{Expression => SqlExpression}
import org.apache.spark.sql.types.{StringType => SqlStringType}

object CollationExtractor {
// Extracts collation identifier from two expressions if both are strings
// with the same collation.
def extractCollationIdentifier(
left: SqlExpression,
right: SqlExpression): Option[String] = {
(left.dataType, right.dataType) match {
case (leftStr: SqlStringType, rightStr: SqlStringType) =>
// Spark needs to make sure to only compare strings of the same collation.
if (leftStr != rightStr) {
throw new IllegalArgumentException(
s"Cannot compare strings with different collations: " +
s"'${leftStr.typeName}' vs '${rightStr.typeName}'"
)
}

val typeName = leftStr.typeName
if (typeName.startsWith("string collate")) {
val collationName = typeName.stripPrefix("string collate").trim
val provider = if (collationName.equalsIgnoreCase("UTF8_LCASE")) "spark" else "icu"
val version = s"${ICU_VERSION.getMajor}.${ICU_VERSION.getMinor}"
Some(s"$provider.$collationName.$version")
} else {
None
}

case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
*/
case class EvalContext(partitionValues: Map[String, String])

/**
* The expression context containing metadata about the operation.
*
* @param collationIdentifier The collation identifier for string comparisons, if applicable.
*/
case class ExprContext(collationIdentifier: Option[String] = None)

/**
* The data types supported by the filtering operations.
*/
Expand Down Expand Up @@ -243,33 +250,38 @@ case class IsNullOp(children: Seq[LeafOp]) extends NonLeafOp with UnaryOp {
* @param children Expected size is 2.
*/

case class EqualOp(children: Seq[LeafOp]) extends NonLeafOp with BinaryOp {
case class EqualOp(children: Seq[LeafOp], exprCtx: Option[ExprContext] = None)
extends NonLeafOp with BinaryOp {
override def validate(forV2: Boolean = false): Unit = validateChildren(children, forV2)

override def eval(ctx: EvalContext): Any = EvalHelper.equal(children, ctx)
}

case class LessThanOp(children: Seq[LeafOp]) extends NonLeafOp with BinaryOp {
case class LessThanOp(children: Seq[LeafOp], exprCtx: Option[ExprContext] = None)
extends NonLeafOp with BinaryOp {
override def validate(forV2: Boolean = false): Unit = validateChildren(children, forV2)

override def eval(ctx: EvalContext): Any = EvalHelper.lessThan(children, ctx)
}

case class LessThanOrEqualOp(children: Seq[LeafOp]) extends NonLeafOp with BinaryOp {
case class LessThanOrEqualOp(children: Seq[LeafOp], exprCtx: Option[ExprContext] = None)
extends NonLeafOp with BinaryOp {
override def validate(forV2: Boolean = false): Unit = validateChildren(children, forV2)

override def eval(ctx: EvalContext): Any =
EvalHelper.lessThan(children, ctx) || EvalHelper.equal(children, ctx)
}

case class GreaterThanOp(children: Seq[LeafOp]) extends NonLeafOp with BinaryOp {
case class GreaterThanOp(children: Seq[LeafOp], exprCtx: Option[ExprContext] = None)
extends NonLeafOp with BinaryOp {
override def validate(forV2: Boolean = false): Unit = validateChildren(children, forV2)

override def eval(ctx: EvalContext): Any =
!EvalHelper.lessThan(children, ctx) && !EvalHelper.equal(children, ctx)
}

case class GreaterThanOrEqualOp(children: Seq[LeafOp]) extends NonLeafOp with BinaryOp {
case class GreaterThanOrEqualOp(children: Seq[LeafOp], exprCtx: Option[ExprContext] = None)
extends NonLeafOp with BinaryOp {
override def validate(forV2: Boolean = false): Unit = validateChildren(children, forV2)

override def eval(ctx: EvalContext): Any = !EvalHelper.lessThan(children, ctx)
Expand Down
53 changes: 45 additions & 8 deletions client/src/main/scala/io/delta/sharing/filters/OpConverter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,30 @@ object OpConverter {

// Convert comparison operators.
case SqlEqualTo(left, right) =>
EqualOp(Seq(convertAsLeaf(left), convertAsLeaf(right)))
EqualOp(
Seq(convertAsLeaf(left), convertAsLeaf(right)),
extractExprContext(left, right)
)
case SqlLessThan(left, right) =>
LessThanOp(Seq(convertAsLeaf(left), convertAsLeaf(right)))
LessThanOp(
Seq(convertAsLeaf(left), convertAsLeaf(right)),
extractExprContext(left, right)
)
case SqlLessThanOrEqual(left, right) =>
LessThanOrEqualOp(Seq(convertAsLeaf(left), convertAsLeaf(right)))
LessThanOrEqualOp(
Seq(convertAsLeaf(left), convertAsLeaf(right)),
extractExprContext(left, right)
)
case SqlGreaterThan(left, right) =>
GreaterThanOp(Seq(convertAsLeaf(left), convertAsLeaf(right)))
GreaterThanOp(
Seq(convertAsLeaf(left), convertAsLeaf(right)),
extractExprContext(left, right)
)
case SqlGreaterThanOrEqual(left, right) =>
GreaterThanOrEqualOp(Seq(convertAsLeaf(left), convertAsLeaf(right)))
GreaterThanOrEqualOp(
Seq(convertAsLeaf(left), convertAsLeaf(right)),
extractExprContext(left, right)
)

// Convert null operations.
case SqlIsNull(child) =>
Expand All @@ -118,7 +133,9 @@ object OpConverter {
)
}
val leafOp = convertAsLeaf(value)
list.map(e => EqualOp(Seq(leafOp, convertAsLeaf(e)))) match {
list.map(e =>
EqualOp(Seq(leafOp, convertAsLeaf(e)), extractExprContext(value, e))
) match {
case Seq() =>
throw new IllegalArgumentException("The In predicate must have at least one entry")
case Seq(child) => child
Expand All @@ -131,13 +148,14 @@ object OpConverter {
val rightOp = convertAsLeaf(right)
val leftIsNullOp = IsNullOp(Seq(leftOp))
val rightIsNullOp = IsNullOp(Seq(rightOp))
val exprCtx = extractExprContext(left, right)
// Either both are null, or none is null and they are equal.
OrOp(Seq(
AndOp(Seq(leftIsNullOp, rightIsNullOp)),
AndOp(Seq(
NotOp(Seq(leftIsNullOp)),
NotOp(Seq(rightIsNullOp)),
EqualOp(Seq(leftOp, rightOp))))
EqualOp(Seq(leftOp, rightOp), exprCtx)))
))

// Unsupported expressions.
Expand Down Expand Up @@ -186,7 +204,9 @@ object OpConverter {
case SqlBooleanType => OpDataTypes.BoolType
case SqlIntegerType => OpDataTypes.IntType
case SqlLongType => OpDataTypes.LongType
case SqlStringType => OpDataTypes.StringType
// We need to match all string types (with different collations),
// and not just the case object which is UTF8_BINARY collated.
case _: SqlStringType => OpDataTypes.StringType
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can revert

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't actually. Without this change this line was only matching the case object of StringType class, but we want it to match it as well as each instance we create for collated types eg. StringType("UTF8_LCASE")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding this as a comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

case SqlDateType => OpDataTypes.DateType
case SqlDoubleType => OpDataTypes.DoubleType
case SqlFloatType => OpDataTypes.FloatType
Expand All @@ -207,4 +227,21 @@ object OpConverter {
case _ => lit.toString
}
}

// Extracts expression context from two expressions, including collation information
// if both are strings with the same collation. This is a generic function that can be
// extended to extract other dimensions of context in the future.
private def extractExprContext(
left: SqlExpression,
right: SqlExpression): Option[ExprContext] = {
val collationId = CollationExtractor.extractCollationIdentifier(left, right)

// If we have any context information, return an ExprContext
if (collationId.isDefined) {
Some(ExprContext(collationIdentifier = collationId))
} else {
None
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.filters

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{
AttributeReference => SqlAttributeReference,
EqualTo => SqlEqualTo,
Literal => SqlLiteral
}
import org.apache.spark.sql.types.{
StringType => SqlStringType
}

class OpConverterCollationSuite extends SparkFunSuite {

test("UTF8_BINARY collation test") {
val defaultStringType = SqlStringType
val sqlColumn = SqlAttributeReference("email", defaultStringType)()
val sqlLiteral = SqlLiteral("[email protected]")
val sqlEq = SqlEqualTo(sqlColumn, sqlLiteral)

val op = OpConverter.convert(Seq(sqlEq)).get.asInstanceOf[EqualOp]
op.validate()

val columnOp = op.children(0).asInstanceOf[ColumnOp]
val literalOp = op.children(1).asInstanceOf[LiteralOp]
assert(columnOp.valueType == OpDataTypes.StringType)
assert(literalOp.valueType == OpDataTypes.StringType)

// UTF8_BINARY (default) should work fine on Scala 2.12
assert(op.exprCtx.isEmpty)
}

// Note: Collated string types (SqlStringType with collation parameter) don't exist in
// Spark 3.5 (Scala 2.12). They were added in Spark 4.0 (Scala 2.13).
// Therefore, we cannot test collation behavior in the Scala 2.12 version of this suite.
// All collation-specific tests are in the Scala 2.13 version of this suite.
}
Loading