-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
colocated-without-hints #13943
colocated-without-hints #13943
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13943 +/- ##
============================================
+ Coverage 61.75% 63.76% +2.01%
- Complexity 207 1535 +1328
============================================
Files 2436 2624 +188
Lines 133233 144580 +11347
Branches 20636 22122 +1486
============================================
+ Hits 82274 92189 +9915
- Misses 44911 45589 +678
- Partials 6048 6802 +754
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@gortiz are you aiming to make it so that the entire plan can be colocated when possible? From Uber side we were planning to add this feature back again this year. For our current implementation that we are using internally for a big use-case, we are able to colocate the entire plan by inferring partition keys for each stage. Though it was doing it in the physical optimization phase and hence is no longer being used in OSS. |
The idea is that Pinot should be able to automatically infer whether joins/group by be optimized using the partition distribution. That is not the case right now. Instead users need to specify a So this PR adds adds implicit hints if they are not already added (and in case it is, populates the hint with default values if not provided).
What do you mean with physical optimization? I don't think the term is very well defined in Pinot. Specifically we use to use it to describe how to execute stages in different workers but also it is also used to describe how to map RelNodes into Pinot operators and/or indexes used. Assuming you mean that you are applying this optimization after calcite rules have been applied (ie in or close to WorkerManager), that is what we are doing here as well. I would love to move the distribution calculations into Calcite (given it has that concept) but this PR is simpler. The code we have right now is applying the optimization in WorkerManager, but only when it makes sense and the hint is present. What this PR does is to add the hint even if the query doesn't include it. |
…mization at startup and query time
400675a
to
860437b
Compare
…fig.useImplicitColocatedByDefault
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
...ker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Show resolved
Hide resolved
@@ -343,6 +343,8 @@ public static class Broker { | |||
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER = | |||
"pinot.broker.enable.partition.metadata.manager"; | |||
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false; | |||
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The added rule is to auto-fill the missing info in the table hint. Should we make the config more explicit about that? This rule will be applied even without JOIN
, and leaf stage worker will process each partition in parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaf stage worker will process each partition in parallel
By default we set parallelism to 1, which AFAIU was the default. Therefore this should not change, am I wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is partition parallelism, i.e. how many threads to use in the following stage after partition is joined. When partition info is set, each partition is processed as a separate query in the leaf stage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed that to do not provide a value for parallelism. Anyway, you are right about the naming. Do you have a suggestion for a new and more precise property name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like AUTO_HINT_PARTITION_INFO
or AUTO_APPLY_PARTITION_HINT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember to change this
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Outdated
Show resolved
Hide resolved
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) { | ||
tableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(MAJOR) This is not robust, and can cause wrong behavior on real-time or hybrid table. We'll need to check both real-time and offline and apply hint when they match each other, or only one exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should apply the same check done in WorkerManager
that verifies that there is at least one server that is fully replicated? I would suggest:
- If the table is hybrid
- And it is not fully replicated
- If the query includes
SET implicitColocateJoin=true
, then we should fail. - If the query does not include
SET implicitColocateJoin=true
then we should not apply the optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How complicated is this check? Are we going to replicate a lot of logic within WorkerManager
?
I was thinking only apply the hint when both tables have the same partitionColumn
, partitionFunctionName
and numPartitions
. Then I realized another issue: we need to know if a table exists to figure out whether it is either unpartitioned, or doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to know if a table exists
What do you mean? If the table doesn't exits we would have failed at validation time, right? Do you mean to actually have segments?
My concern here is that WorkerManager
fails if there is at least one partition that is not fully replicated (see
pinot/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
Line 450 in 49126f2
Preconditions.checkState(!fullyReplicatedServers.isEmpty(), |
partitionColumn
, partitionFunctionName
and numPartitions
and the table is not fully replicated, we will end up failing at WorkerManager
.
My suggestion is to modify WorkerManager
to provide a method similar to getPartitionTableInfo
that we can call from the rule. This method should check whether partition info is equal and if for each partition there is at least one server with a full copy of the partition. The main differences with current getPartitionTableInfo
is that:
- It cannot return
PartitionTableInfo
, given it is an internal class the rule don't care about. - It cannot call
getTablePartitionInfo
, which doesn't only get the info but also compares values on the hint with what is stored in the routing manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the code. Can you take a look to the new implementation that delegates on worker manager?
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
…ableScan in PinotImplicitTableHintRule
@@ -343,6 +343,8 @@ public static class Broker { | |||
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER = | |||
"pinot.broker.enable.partition.metadata.manager"; | |||
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false; | |||
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is partition parallelism, i.e. how many threads to use in the following stage after partition is joined. When partition info is set, each partition is processed as a separate query in the leaf stage
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Outdated
Show resolved
Hide resolved
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) { | ||
tableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How complicated is this check? Are we going to replicate a lot of logic within WorkerManager
?
I was thinking only apply the hint when both tables have the same partitionColumn
, partitionFunctionName
and numPartitions
. Then I realized another issue: we need to know if a table exists to figure out whether it is either unpartitioned, or doesn't exist.
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Outdated
Show resolved
Hide resolved
97c4621
to
e3f2096
Compare
I've just pushed a couple of changes to delegate more logic on the WorkerManager. I didn't have time to actually verify it works as expected but I'm going to be traveling tomorrow and I wanted to be sure the changes are not lost. |
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
99cf25a
to
598b17e
Compare
@@ -343,6 +343,8 @@ public static class Broker { | |||
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER = | |||
"pinot.broker.enable.partition.metadata.manager"; | |||
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false; | |||
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like AUTO_HINT_PARTITION_INFO
or AUTO_APPLY_PARTITION_HINT
?
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
private static TableOptions calculateTableOptions( | ||
@Nullable RelHint relHint, TablePartitionInfo tablePartitionInfo, LogicalTableScan tableScan) { | ||
if (relHint == null) { | ||
return ImmutablePinotImplicitTableHintRule.TableOptions.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked out the code but cannot find this class in IDE. Is this auto generated by the annotation framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the class that is autogenerated by immutables.
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
Outdated
Show resolved
Hide resolved
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Outdated
Show resolved
Hide resolved
String tableName = RelToPlanNodeConverter.getTableNameFromTableScan(tableScan); | ||
|
||
String offlineName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); | ||
TablePartitionInfo offlineTpi = _workerManager.getTablePartitionInfo(offlineName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible scenario is that both tables exist, but only one table has partition info. In that case we should return null
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just simplify this with WorkerManager.calculatePartitionTableInfo(tableName)
now that we have that function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed this code to delegate on WorkerManager. The rule is now simpler and all logic (but merge) is in WorkerManager now.
@Jackie-Jiang Question: Should we also set |
@gortiz Yes! I think we can enable it by default |
8a09ca2
to
24036a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good. Nice job extracting the common logic in WorkerManager
private static RelHint getTableOptionHint(LogicalTableScan tableScan) { | ||
return tableScan.getHints().stream() | ||
.filter(relHint -> relHint.hintName.equals(PinotHintOptions.TABLE_HINT_OPTIONS)) | ||
.findAny() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to pick the first hint that matches the name. The reason is that user might override the same hint multiple times, and the first one should be the closest one.
Suggest adding this as a util method in PinotHintStrategyTable
// there is a hint, check fill default data and obtain the partition parallelism if supplied | ||
Map<String, String> kvOptions = relHint.kvOptions; | ||
|
||
ImmutableTableOptions newTableOptions = ImmutableTableOptions.copyOf(implicitTableOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we override here? I think we just want to validate if the explicit hint matches the implicit one, and always use the implicit one? Partition parallelism should always come from explicit hint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is here in case there is an explicit hint, in which case the explicit value provided by the writer wins. That is what is called override here. The base ImmutableTableOptions
is the one calculated from the table and does not include partition information. Then for each dimension on TableOptions (key, function, etc) the implicit value is override by the explicit value provided, if any.
The code that enforces a specific value is in WorkerManager
. We could move it here, but I think that can be done in another PR.
...ery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotImplicitTableHintRule.java
Show resolved
Hide resolved
} | ||
switch (useImplicitColocatedOptionValue.toLowerCase()) { | ||
case "true": | ||
Objects.requireNonNull(workerManager, "WorkerManager is required for implicit colocated join"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that we run into this query path without the worker manager being set? I still think this path should be handled the same way as instance level setting. Whether worker manager is set or not should be orthogonal to whether the setting is instance level or query level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that worker manager is enabled by default is more difficult, but if pinot.broker.enable.partition.metadata.manager
is set to false and then the query includes set implicitColocateJoin=true;
, then we fail this check.
It is just a matter of failing fast. The writer asked to use this feature and it cannot be used because config indicates that worker manager should not be instantiated. We can just ignore the writer request, but given this is not a hint but an option, I think it is more elegant to fail with a correct message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the worker manager is read from the environment config, so whether it is null is not decided by this config, but purely based on the code path that initialized the QueryEnvironment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. WorkerManager will be null or not depending on how QueryEnvironment is initialized.
But inferPartitionHint
(the new name of useImplicitColocatedOptionValue
) is decided by the query option. What I'm doing here is to fail if WorkerManager was initialized to null AND the user is explicitly asking to inferPartitionHint
. I think that is better than the alternative, which is to do not apply infer partition hint when explicitly asked because the worker manager wasn't enabled at startup time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the intention correctly, we want to fail the planning if partitionMetadataManager
is unavailable instead of workerManager
. workerManager
can be null
in certain query path independent of the value of partitionMetadataManager
. We need to ensure query with option never hits that code path
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
Outdated
Show resolved
Hide resolved
@@ -343,6 +343,8 @@ public static class Broker { | |||
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER = | |||
"pinot.broker.enable.partition.metadata.manager"; | |||
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false; | |||
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember to change this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...ker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
Outdated
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintStrategyTable.java
Outdated
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
Outdated
Show resolved
Hide resolved
} | ||
switch (useImplicitColocatedOptionValue.toLowerCase()) { | ||
case "true": | ||
Objects.requireNonNull(workerManager, "WorkerManager is required for implicit colocated join"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the worker manager is read from the environment config, so whether it is null is not decided by this config, but purely based on the code path that initialized the QueryEnvironment
Right now in order to use colocated joins users need to provide the
tableOption
hint for each table in the join. This hint contains several attributes and most of them are required even if the information can be obtained from the table config. In fact we explicitly verify that the provided values are the equal to what it is stored in the table config.This PR changes this optimization to be applied automatically. Although I've been studying other solutions, the simplest solution I've found is to create a rule that:
pinot.broker.enable.partition.metadata.manager
is trueTODO: