-
Notifications
You must be signed in to change notification settings - Fork 948
[KYUUBI #7126][LINEAGE] Support merge into syntax in row level catalog #7127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
||
private def extractInstructionOutputs(instruction: Expression): Seq[Expression] = { | ||
instruction match { | ||
case p if p.nodeName == "Split" => getField[Seq[Expression]](p, "otherOutput") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to spark's implementation , we should use otherOutput
https://github.com/apache/spark/blob/46b6ccbd93c4fe5c2b72f730a776a2739bdbc7b4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L467-L470
@@ -46,6 +47,9 @@ trait LineageParser { | |||
val SUBQUERY_COLUMN_IDENTIFIER = "__subquery__" | |||
val AGGREGATE_COUNT_COLUMN_IDENTIFIER = "__count__" | |||
val LOCAL_TABLE_IDENTIFIER = "__local__" | |||
val METADATA_COL_ATTR_KEY = "__metadata_col" | |||
val ORIGINAL_ROW_ID_VALUE_PREFIX: String = "__original_row_id_" | |||
private val LOG = LoggerFactory.getLogger(classOf[LineageParser]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.spark.internal.Logging
... with Logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry , I didn't use this LOG
, so I deleted it. And the trait LineageParser
doesn't seem to work with Logging
override def catalogName: String = { | ||
if (SPARK_RUNTIME_VERSION <= "3.1") { | ||
"org.apache.spark.sql.connector.InMemoryTableCatalog" | ||
} else if (SPARK_RUNTIME_VERSION <= "3.2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only support 3.3 and above now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kyuubi Spark Listener Extension
I see lineage plugin can support 3.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have officially removed support prior Spark 3.2, some code/docs are left over to clean up, such code is unreachable since CI was removed
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7127 +/- ##
=======================================
Coverage 0.00% 0.00%
=======================================
Files 697 700 +3
Lines 43203 43411 +208
Branches 5854 5886 +32
=======================================
- Misses 43203 43411 +208 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@@ -307,7 +310,35 @@ trait LineageParser { | |||
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) => | |||
k.withName(s"$table.${k.name}") -> v | |||
} | |||
case p if p.nodeName == "MergeRows" => | |||
val instructionsOutputs = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you ignore notMatchedBySourceInstructions
?
}.collect { | ||
case (keyAttr: Attribute, instructionsOutput) | ||
if instructionsOutput | ||
.exists(!_.references.isEmpty) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.exists(_.references.nonEmpty)
.map(extractInstructionOutputs) | ||
val nextColumnsLineage = ListMap(p.output.indices.map { index => | ||
val keyAttr = p.output(index) | ||
val instructionOutputs = instructionsOutputs.map(_(index)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this index match always correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I am busy these days. I'll confirm later.
Why are the changes needed?
In Catalog which supports row level interface (iceberg etc.), merge into syntax will be rewritten as WriteDelta or ReplaceData operator through rule. We should support the extraction of lineage relationship under this type.
How was this patch tested?
add new tests for row-level catalog
Was this patch authored or co-authored using generative AI tooling?
no