-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37440] Fix the bug that parallelism.default do not always adopts 1 as the default value. #26277
base: master
Are you sure you want to change the base?
Conversation
ping @1996fanrui cc @davidradl |
@@ -459,7 +459,10 @@ public static String[] mergeListsToArray(List<String> base, List<String> append) | |||
ConfigOptions.key("parallelism.default") | |||
.intType() | |||
.defaultValue(1) | |||
.withDescription("Default parallelism for jobs."); | |||
.withDescription("Default parallelism for jobs. There is an exception here. When creating a" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what the new description is saying:
- Is the description ever a default of 1? If the default is not always 1 - I suggest we consider removing the default though this would have migration implications we would need to think through.
- the phrase "There is an exception here." I would not use the word exception; as this is an overloaded word implying there might be an error of some sort.
- I do not think the sentence "the return value of Runtime. getRuntime. availableProcessors()" should be included, it refers to java code, that would not be very understandable for someone truing to configure this.
Please can you verify what cases 1 is the default and when the parallelism should be the number of availableProcessors? It would seem to me that using the available processors for parallelism would make a lot of sense for all cases - if a value has not been supplied. Is this the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The default is not always 1. Because remove the default value is a break change, so I just update the description.
Yeah, the word exception looks weird, how about special case
?
I checked all the code and found two special case. I have pasted the two case into the PR's description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer when you say special cases, do you mean that there are 3 scenarios , default = 1 , case 1 and case 2. If default=1 never happens - then we could remove it - deprecate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default=1
still is the main behavior, remove it will break the API compatibility.
Personally, I want remove the the .defaultValue(1)
f73f080
to
25a7216
Compare
@@ -495,7 +495,6 @@ public boolean isCompatibleWith(@Nonnull Configuration configuration) { | |||
@Override | |||
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) { | |||
return (pipeline, config, classLoader) -> { | |||
final int parallelism = config.get(CoreOptions.DEFAULT_PARALLELISM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not used at all.
@flinkbot run azure |
@@ -459,7 +459,8 @@ public static String[] mergeListsToArray(List<String> base, List<String> append) | |||
ConfigOptions.key("parallelism.default") | |||
.intType() | |||
.defaultValue(1) | |||
.withDescription("Default parallelism for jobs."); | |||
.withDescription( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the above link, we could check why CI is failed: Documentation is outdated, please regenerate it according to the instructions in flink-docs/README.md.
After updating the description, the document should be updated together via running mvn package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests -Pskip-webui-build
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the reminder.
@@ -74,7 +74,7 @@ | |||
<td><h5>parallelism.default</h5></td> | |||
<td style="word-wrap: break-word;">1</td> | |||
<td>Integer</td> | |||
<td>Default parallelism for jobs.</td> | |||
<td>Default parallelism for jobs. There are two special case. The first case is when creating a StreamExecutioneEnvironment, if the value of parallelism.default is not explicitly specified, the number of processors available to the Java virtual machine will be used as the default parallelism. The second case is when creating mini cluster, the number of slots will be used as the default parallelism</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits:
"There are two special case." -> "There are two cases."
"The first case is when creating a StreamExecutioneEnvironment, if the value of parallelism.default is not explicitly specified, the number of processors available to the Java virtual machine will be used as the default parallelism." ->
In the first case, when creating a StreamExecutioneEnvironment without specifying parallelism.default ,the number of processors available to the Java virtual machine will be used."
" The second case is when creating mini cluster, the number of slots will be used as the default parallelism"
-> I am not sure what you mean by mini cluster. The slots are on a task manager - how does this relate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the total number of slots in mini cluster.
I see docs we should update this test too. around " setParallelism(int parallelism) Set the default parallelism for the job." |
@@ -459,7 +459,8 @@ public static String[] mergeListsToArray(List<String> base, List<String> append) | |||
ConfigOptions.key("parallelism.default") | |||
.intType() | |||
.defaultValue(1) | |||
.withDescription("Default parallelism for jobs."); | |||
.withDescription( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we removing .defaultValue(1) , as from your above description - it is never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, It used in most scenes.
Personally, I want remove the the .defaultValue(1)
but I concern the API compatibility.
I will do it if more committers suggest to remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to keep the default value as it's using in some scenes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add an existing option[1] example here, it's jobmanager.scheduler
. Although, the default value is Default, but the default value will be updated to AdaptiveBatch
for flink batch job. The description is:
Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.
I think it's similar with parallelism.default
, there are some special cases that don't respect the default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
<td><h5>sink.committer.retries</h5></td> | ||
<td style="word-wrap: break-word;">10</td> | ||
<td>Integer</td> | ||
<td>The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits:
I see we have copied:
"committable operations (such as transactions)" I do t think this reads well. An operation can take part in a transaction - but is not a transaction
Flink fails -> don't we mean the Flink application / job fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is generated by mvn package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests -Pskip-webui-build
. I don't know the reason yet.
@@ -459,7 +459,8 @@ public static String[] mergeListsToArray(List<String> base, List<String> append) | |||
ConfigOptions.key("parallelism.default") | |||
.intType() | |||
.defaultValue(1) | |||
.withDescription("Default parallelism for jobs."); | |||
.withDescription( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add an existing option[1] example here, it's jobmanager.scheduler
. Although, the default value is Default, but the default value will be updated to AdaptiveBatch
for flink batch job. The description is:
Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.
I think it's similar with parallelism.default
, there are some special cases that don't respect the default value.
@@ -470,7 +470,8 @@ public static String[] mergeListsToArray(List<String> base, List<String> append) | |||
ConfigOptions.key("parallelism.default") | |||
.intType() | |||
.defaultValue(1) | |||
.withDescription("Default parallelism for jobs."); | |||
.withDescription( | |||
"Default parallelism for jobs. There are two special cases. In the first case, when creating a StreamExecutioneEnvironment without specifying parallelism.default, the number of processors available to the Java virtual machine will be used. In the second case, when creating mini cluster without specifying parallelism.default, the total number of slots in mini cluster will be used."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question about the first case, does it cover all running modes?
For application and session mode, and standalone, yarn or kubernetes type. Doesn't parallelism.default
take effect for all of these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The first case only found in the local mode.
@@ -51,7 +51,7 @@ The following configuration options are available: (the default is bold) | |||
With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer. The settings are: | |||
`NONE`: disable the closure cleaner completely, `TOP_LEVEL`: clean only the top-level class without recursing into fields, `RECURSIVE`: clean all the fields recursively. | |||
|
|||
- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job. | |||
- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job. There are two special cases. In the first case, when creating a StreamExecutioneEnvironment without specifying parallelism.default, the number of processors available to the Java virtual machine will be used. In the second case, when creating mini cluster without specifying parallelism.default, the total number of slots in mini cluster will be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, if setParallelism(int parallelism)
is called, the default parallelism will be changed. I don't understand why mentioning these 2 cases here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the original comment, users understand the parallelism will be 1 if it is not explicitly set. Users confused if they find more parallelism to be used.
…ts 1 as the default value.
@1996fanrui Could you take a review again? |
What is the purpose of the change
This PR proposes to fix the bug that parallelism.default do not always adopts 1 as the default value.
The config parallelism.default defined as follows.
It means the value of parallelism.default will be 1 if users do not specify it.
But the default is not always 1. I checked all the code and found two special case.
createLocalEnvironment
in theStreamExecutionEnvironment
at line 2248.registerEnv
in theMiniClusterExtension
at line 264.I think the DEFAULT_PARALLELISM is a open API, we'd better not change the behavior. So I add extra comments for this config.
Brief change log
Fix the bug that parallelism.default do not always adopts 1 as the default value.
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation