-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat(native-pos) Use HashTable caching in Broadcast joins #26806
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideEnables Velox hash table caching for broadcast (replicated) joins in Presto native execution by passing a useCachedHashTable flag on HashJoinNodes for appropriate join types, and wires up new system configs to control broadcast hash table caching and exchange eager fetch behavior, exposing them to Velox query configuration. Sequence diagram for broadcast join planning with hash table cachingsequenceDiagram
actor User
participant PrestoPlanner
participant VeloxQueryPlanConverterBase as PlanConverter
participant SystemConfig
participant core_HashJoinNode as HashJoinNode
participant VeloxEngine as VeloxEngine
User->>PrestoPlanner: submit query with broadcast join
PrestoPlanner->>PlanConverter: toVeloxQueryPlan(joinNode)
PlanConverter->>SystemConfig: broadcastJoinTableCachingEnabled()
SystemConfig-->>PlanConverter: bool enabled
PlanConverter->>PlanConverter: detect REPLICATED distribution
PlanConverter->>PlanConverter: compute useCachedHashTable = isBroadcastJoin && enabled
alt useCachedHashTable true
PlanConverter->>HashJoinNode: new HashJoinNode(..., useCachedHashTable=true)
else useCachedHashTable false
PlanConverter->>HashJoinNode: new HashJoinNode(..., useCachedHashTable omitted)
end
PlanConverter-->>PrestoPlanner: HashJoinNode wrapped in ProjectNode
PrestoPlanner-->>VeloxEngine: submit Velox plan
VeloxEngine->>VeloxEngine: HashBuild uses cached hash table per worker when useCachedHashTable=true
Class diagram for updated SystemConfig and HashJoinNode usageclassDiagram
class SystemConfig {
+static kBroadcastJoinTableCachingEnabled : string_view
+static kExchangeEagerFetchEnabled : string_view
+broadcastJoinTableCachingEnabled() bool
+exchangeEagerFetchEnabled() bool
}
class VeloxQueryPlanConverterBase {
+toVeloxQueryPlan(node SemiJoinNodePtr, tableWriteInfo TableWriteInfo, taskId PrestoTaskId) core_PlanNodePtr
+toVeloxQueryPlan(node JoinNodePtr, tableWriteInfo TableWriteInfo, taskId PrestoTaskId) core_PlanNodePtr
+toVeloxQueryPlan(node LeftSemiJoinNodePtr, tableWriteInfo TableWriteInfo, taskId PrestoTaskId) core_PlanNodePtr
-exprConverter_ ExprConverter
-typeParser_ TypeParser
}
class core_HashJoinNode {
+id : PlanNodeId
+joinType : JoinType
+nullAware : bool
+leftKeys : vector_ExprPtr
+rightKeys : vector_ExprPtr
+filter : ExprPtr
+left : core_PlanNodePtr
+right : core_PlanNodePtr
+outputType : RowTypePtr
+useCachedHashTable : bool
+HashJoinNode(id PlanNodeId, joinType JoinType, nullAware bool, leftKeys vector_ExprPtr, rightKeys vector_ExprPtr, filter ExprPtr, left core_PlanNodePtr, right core_PlanNodePtr, outputType RowTypePtr)
+HashJoinNode(id PlanNodeId, joinType JoinType, nullAware bool, leftKeys vector_ExprPtr, rightKeys vector_ExprPtr, filter ExprPtr, left core_PlanNodePtr, right core_PlanNodePtr, outputType RowTypePtr, useCachedHashTable bool)
}
VeloxQueryPlanConverterBase --> SystemConfig : uses
VeloxQueryPlanConverterBase --> core_HashJoinNode : constructs
class SemiJoinNode {
+id : PlanNodeId
+distributionType : optional_DistributionType
}
class JoinNode {
+id : PlanNodeId
+distributionType : optional_JoinDistributionType
+filter : optional_Expr
+left : PlanNodePtr
+right : PlanNodePtr
+outputVariables : vector_Variable
}
class LeftSemiJoinNode {
+id : PlanNodeId
+distributionType : optional_DistributionType
+left : PlanNodePtr
+right : PlanNodePtr
}
VeloxQueryPlanConverterBase --> SemiJoinNode : converts
VeloxQueryPlanConverterBase --> JoinNode : converts
VeloxQueryPlanConverterBase --> LeftSemiJoinNode : converts
Flow diagram for system config mapping to Velox query configflowchart LR
A(SystemConfig properties)
B(kBroadcastJoinTableCachingEnabled)
C(kExchangeEagerFetchEnabled)
D(PrestoToVeloxQueryConfig.updateFromSystemConfigs)
E(Velox core_QueryConfig)
F(core_HashBuild and exchanges)
A --> B
A --> C
B --> D
C --> D
D --> E
E --> F
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Summary: Velox introduced ability to Cache Hash Tables in the HashBuild operator. This is useful in Broadcast joins as we can built the HashTable once per worker and re-use it for all the join tasks that land on that worker. Velox PRs: facebookincubator/velox#15754 and facebookincubator/velox#15768 This diff enables setting the `useCachedHashTable=true` during velox `HashBuild` node construction in the cases that it is a broadcast join Differential Revision: D88900941
4525c88 to
63c1e5a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The broadcast-join
useCachedHashTablelogic is duplicated in several places (semi/anti join, regular join, semi-project); consider extracting a small helper or utility function to encapsulate theisBroadcastJoin && broadcastJoinTableCachingEnabled()check and HashJoinNode construction to keep the plan converter logic DRY and easier to maintain. - The new
#include "presto_cpp/main/types/PrestoTaskId.h"inPrestoToVeloxQueryPlan.cppdoes not appear to be used in this diff; if it’s unnecessary, removing it would keep dependencies minimal. - In the new HashJoinNode constructions, the expression
joinType == core::JoinType::kAnti ? true : falsecan be simplified tojoinType == core::JoinType::kAntito reduce noise and improve readability.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The broadcast-join `useCachedHashTable` logic is duplicated in several places (semi/anti join, regular join, semi-project); consider extracting a small helper or utility function to encapsulate the `isBroadcastJoin && broadcastJoinTableCachingEnabled()` check and HashJoinNode construction to keep the plan converter logic DRY and easier to maintain.
- The new `#include "presto_cpp/main/types/PrestoTaskId.h"` in `PrestoToVeloxQueryPlan.cpp` does not appear to be used in this diff; if it’s unnecessary, removing it would keep dependencies minimal.
- In the new HashJoinNode constructions, the expression `joinType == core::JoinType::kAnti ? true : false` can be simplified to `joinType == core::JoinType::kAnti` to reduce noise and improve readability.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| }}, | ||
|
|
||
| {.prestoSystemConfig = | ||
| std::string(SystemConfig::kExchangeEagerFetchEnabled), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is kExchangeEagerFetchEnabled in this PR?
| "order-by-spill-enabled"}; | ||
| static constexpr std::string_view kMaxSpillBytes{"max-spill-bytes"}; | ||
|
|
||
| /// When enabled, hash tables built for broadcast joins are cached and reused |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is kBroadcastJoinTableCachingEnabled in this PR? Is HashTable caching dependent on it? It seems this change belongs to another PR.
| rightKeys.emplace_back(exprConverter_.toVeloxExpr(right)); | ||
| } | ||
|
|
||
| // Check if this is a broadcast join (REPLICATED distribution) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is repeated 3 times. Is it possible to extract a common function e.g. createHashJoinNode(,...) for it?
Summary:
Velox introduced ability to Cache Hash Tables in the HashBuild operator.
This is useful in Broadcast joins as we can built the HashTable once per worker
and re-use it for all the join tasks that land on that worker.
Velox PRs: facebookincubator/velox#15754
and facebookincubator/velox#15768
This diff enables setting the
useCachedHashTable=trueduringvelox
HashBuildnode construction in the cases that it is abroadcast join
Differential Revision: D88900941