Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37440] Fix the bug that parallelism.default do not always adopts 1 as the default value. #26277

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The following configuration options are available: (the default is bold)
With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer. The settings are:
`NONE`: disable the closure cleaner completely, `TOP_LEVEL`: clean only the top-level class without recursing into fields, `RECURSIVE`: clean all the fields recursively.

- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job.
- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job. There are two special cases. In the first case, when creating a LocalStreamEnvironment without specifying parallelism.default, the number of processors available to the Java virtual machine will be used. In the second case, when creating mini cluster without specifying parallelism.default, the total number of slots in mini cluster will be used.

- `getMaxParallelism()` / `setMaxParallelism(int parallelism)` Set the default maximum parallelism for the job. This setting determines the maximum degree of parallelism and specifies the upper limit for dynamic scaling.

Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
<td><h5>parallelism.default</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Default parallelism for jobs.</td>
<td>Default parallelism for jobs. There are two special cases. In the first case, when creating a LocalStreamEnvironment without specifying parallelism.default, the number of processors available to the Java virtual machine will be used. In the second case, when creating mini cluster without specifying parallelism.default, the total number of slots in mini cluster will be used.</td>
</tr>
</tbody>
</table>
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/sink_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>sink.committer.retries</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
I see we have copied:
"committable operations (such as transactions)" I do t think this reads well. An operation can take part in a transaction - but is not a transaction

Flink fails -> don't we mean the Flink application / job fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is generated by mvn package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests -Pskip-webui-build. I don't know the reason yet.

</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ public boolean isCompatibleWith(@Nonnull Configuration configuration) {
@Override
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, config, classLoader) -> {
final int parallelism = config.get(CoreOptions.DEFAULT_PARALLELISM);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is not used at all.

final JobGraph jobGraph = streamGraph.getJobGraph();
// The job graphs from different cases are generated from the same stream
// graph, resulting in the same job ID, which can lead to exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ public static String[] mergeListsToArray(List<String> base, List<String> append)
ConfigOptions.key("parallelism.default")
.intType()
.defaultValue(1)
.withDescription("Default parallelism for jobs.");
.withDescription(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=66528&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&s=ae4f8708-9994-57d3-c2d7-b892156e7812&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=31682

From the above link, we could check why CI is failed: Documentation is outdated, please regenerate it according to the instructions in flink-docs/README.md.

After updating the description, the document should be updated together via running mvn package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests -Pskip-webui-build.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the reminder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we removing .defaultValue(1) , as from your above description - it is never used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, It used in most scenes.
Personally, I want remove the the .defaultValue(1) but I concern the API compatibility.
I will do it if more committers suggest to remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to keep the default value as it's using in some scenes.

Copy link
Member

@1996fanrui 1996fanrui Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add an existing option[1] example here, it's jobmanager.scheduler. Although, the default value is Default, but the default value will be updated to AdaptiveBatch for flink batch job. The description is:

Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.

I think it's similar with parallelism.default, there are some special cases that don't respect the default value.

[1] https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#jobmanager-scheduler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

"Default parallelism for jobs. There are two special cases. In the first case, when creating a LocalStreamEnvironment without specifying parallelism.default, the number of processors available to the Java virtual machine will be used. In the second case, when creating mini cluster without specifying parallelism.default, the total number of slots in mini cluster will be used.");

// ------------------------------------------------------------------------
// file systems
Expand Down