-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53880][SQL] Support DSv2 in PushVariantIntoScan #52578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thank you, @viirya .
BTW, maybe the test case is generated by
|
cc @huaxingao , @cloud-fan too from the previous approach. |
Yes. This is co-authored using Claude Code. |
I saw all tests are passing in last CI run but just two Java style issues. Fixed. |
Hmm, all tests are passing again. But what is this error on documentation generation pipeline?
|
It seems due to java doc. I'm looking into it. |
* <p> | ||
* For example, if a query accesses {@code variant_get(v, '$.a', 'int')} and | ||
* {@code variant_get(v, '$.b', 'string')}, the extracted schema would be | ||
* {@code struct<0:int, 1:string>} where field ordinals correspond to the access order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, these three lines seem to be the root cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, {@code }
blocks are the issues. Not sure why this javadoc tag cannot be parsed.
* <p> | ||
* For example, if a query accesses `variant_get(v, '$.a', 'int')` and | ||
* `variant_get(v, '$.b', 'string')`, the extracted schema would be | ||
* `struct<0:int, 1:string>` where field ordinals correspond to the access order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* `struct<0:int, 1:string>` where field ordinals correspond to the access order. | |
* `struct<a:int, b:string>` where field ordinals correspond to the access order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is actually the ordinals. It is how PushVariantIntoScan rewrites the type.
* @since 4.1.0 | ||
*/ | ||
@Evolving | ||
public final class VariantAccessInfo implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be a java record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay. Is it necessary though?
import org.apache.spark.sql.errors.QueryExecutionErrors | ||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | ||
// BEGIN-V2-SUPPORT: DataSource V2 imports |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this special comment marker mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added at the beginning to make the V2 code separately clear. I will remove it.
I feel reverting the original PR first, then submitting a brand-new PR makes the git history clearer |
Okay for me. @dongjoon-hyun @cloud-fan @huaxingao What do you think? |
I opened the revert PR #52590. Once it is merged, I will rebase this PR. |
I'm fine in either way, @viirya . I guess it's more up to @huaxingao 's preference. |
Thanks @viirya for the PR! Appreciate you putting together an alternative. A couple of thoughts before we move forward:
|
Column pruning is not the same as this variant scan pushdown. This feature basically changes the datatype (variant type to struct type) not just pruning fields from it. Mixing column pruning with variant scan pushdown causes confusion on API semantics and unexpected errors on DSv2 datasource implementation. Actually you can access all fields from a variant column but this variant scan pushdown still works effectively. It is not about reducing read surface but optimize variant access and usage in the read and the query plan. No to mention that with current approach if it pushes a variant scan into a DSv2 implementation doesn't know about it, there will be unexpected errors.
No. The test just verifies the schema was changed. InMemoryTable/Scan doesn't actually do the variant pushdown. In other words, it just makes sure that you changed the variant type to struct type in the InMemoryTable. It doesn't actually test if the variant scan happens correctly or not, isn't? If the table/scan doesn't support it, what will happen? In other words, for example, we can simply change a variant type to any other type and keep the test passing because the schema is matched. Btw, there are actually issues on the test, but that is not the only issue going to revert the PR. |
Thanks @viirya for the reply! If i understand correctly, the core concern is: “If Spark pushes a variant scan into a DSv2 source that doesn’t understand it, we’ll see unexpected errors.” I agree — that’s exactly the failure mode we should avoid. Would it be acceptable if I add a planner-side guard so this only happens when the source explicitly opts in? On tests: I understand my InMemoryTable test might have issues. Can we fix it to exercise the planner contract correctly, or do you consider a built-in DSv2 Parquet test required for DSv2 change? |
Thanks @huaxingao for the discussion and understanding. I think we need an explicit DSv2 API to make the contract between Spark and datasource implementation around this variant pushdown feature. That is this PR proposed to do.
That is also what this PR proposed to do, adding dedicated DSv2 Variant pushdown tests for both row-based and vectorized-based readers with good test coverage. |
What changes were proposed in this pull request?
This patch goes to add DSv2 support to the optimization rule
PushVariantIntoScan
. ThePushVariantIntoScan
rule only supports DSv1 Parquet (ParquetFileFormat
) source. It limits the effectiveness of variant type usage on DSv2.Why are the changes needed?
Although #52522 tried to add DSv2 support recently, the implementation implicitly binds
pruneColumns
to this variant access pushdown which could cause unexpected errors on the DSv2 datasources which don't support that. It also breaks the API semantics. We need an explicit API between Spark and DSv2 datasource for the feature.#52522 also didn't test through this DSv2 variant pushdown feature actually on the built-in DSv2 Parquet datasource but on InMemoryTable. This patch reverts #52522 and proposes a new approach with comprehensive test coverage.
Does this PR introduce any user-facing change?
Yes. After this PR, if users enable
spark.sql.variant.pushVariantIntoScan
, they can push down variant column accesses into DSv2 datasource if it is supported.How was this patch tested?
Added new unit test suites
PushVariantIntoScanV2Suite
andPushVariantIntoScanV2VectorizedSuite
.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code v2.0.13