-
Notifications
You must be signed in to change notification settings - Fork 194
Adding queryParamsHashId into tablePath for Snapshot/CDF/Streaming queries #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
client/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala
Outdated
Show resolved
Hide resolved
client/src/main/scala/org/apache/spark/delta/sharing/PreSignedUrlCache.scala
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Outdated
Show resolved
Hide resolved
This reverts commit 85dfb47.
740b089
to
ebb9bc3
Compare
@@ -764,11 +764,21 @@ case class DeltaSharingSource( | |||
// version. | |||
val filteredActions = fileActions.filter{ indexedFile => indexedFile.getFileAction != null } | |||
|
|||
// For streaming queries, build the query parameters hash using the start/end version. | |||
// All the files between start and end version are cached for a tablePath. | |||
val queryParamsHashId = QueryUtils.getQueryParamsHashId(startVersion, endOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use the DS request param to generate the hash id instead?
(startVersion
+ endOffset
) is not enough. The request will differ based on the value of isStartingVersion
and options.readChangeFeed
as well.
Maybe we could use a global variable similar to latestRefreshFunc
to keep track of the hash id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
endOffset already includes isStartingVersion, but readChangeFeed is missing.
This is a good suggestion. I have added a global variable to capture query parameters. getFiles and getCDFfiles requires different inputs, so we should be safe this way.
params.snapshotAtAnalysis.filesForScan(Nil, None, None, this) | ||
.map(f => toDeltaSharingPath(f).toString) | ||
val queryParamsHashId = QueryUtils.getQueryParamsHashId( | ||
version = params.snapshotAtAnalysis.version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though the chance might be very low, it's still possible that between this params.snapshotAtAnalysis.version
(triggers GetTableMetadata) and the params.snapshotAtAnalysis.filesForScan
below (triggers QueryTable), the latest table version has changed?
We could use the version returned from QueryTable
call to generate the hash id instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. I changed the logic to build hash id inside filesForScan and return a hash id string to here.
client/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala
Outdated
Show resolved
Hide resolved
partitionFilters.map(_.sql).mkString(";"), | ||
dataFilters.map(_.sql).mkString(";"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Similar to above, I think it's better use the real request params to generate the hash id. partitionFilters
and dataFilters
might be redundant since jsonPredicateHints
is included.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed partitionFilters and dataFilters, and add a new predicates which seem to also be generated from partitionFilters and dataFilters.
This change updates the keying strategy of the existing pre-signed URL cache (which stores fileId -> fileUrl mappings) to account for different query shapes on the same table. Previously, the cache key only included the table path, so a new query with different parameters could overwrite existing cache entries. This update ensures correctness by incorporating the full query context into the cache key.
Query Parameters by Query Type:
Snapshot Queries:
Change Data Feed (CDF) Queries:
Filters are excluded from the key, as files are fetched for the entire version range and filtered in memory. CDF queries also do not support limits.
Streaming Queries:
Streaming parquet files
Streaming CDF files
Similar to CDF, filters and limits are excluded from the key, since filtering is applied in memory after fetching all relevant files.
This update prevents cache collisions between distinct query shapes on the same table and ensures accurate file-to-URL resolution for all supported query types.