Skip to content

Conversation

@sduvvuri1603
Copy link
Contributor

@sduvvuri1603 sduvvuri1603 commented Nov 12, 2025

Summary

  • add pipeline_run_parallelism to dsl.PipelineConfig (proto, Go, Python) and thread it through the compiler/runtime so the value lands in the workflow bundle and Kubernetes platform spec
  • persist the limit in kfp-pipeline-config and apply it to Argo spec.parallelism; extend backend tests and regenerate proto goldens
  • update docs and samples to account for the new config

What pipeline_run_parallelism does

pipeline_run_parallelism lets authors cap the number of tasks that can execute concurrently in a pipeline run. When set, the value is stored in kfp-pipeline-config and the compiler writes it to the workflow manifest (spec.parallelism) so Argo enforces the limit.

Validation

  • SDK compilation golden (pipeline_with_run_parallelism.yaml) shows pipelineRunParallelism in the compiled spec
  • Backend compiler golden updates confirm spec.parallelism is populated
  • TestCreateRun_PipelineRunParallelismConfigMap exercises the config map persistence path
  • Manual before/after runs verify the config map entry and resulting workflow manifest

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow
Copy link

Hi @sduvvuri1603. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@sduvvuri1603 sduvvuri1603 force-pushed the feature/pipeline-run-parallelism branch 2 times, most recently from 99f2fc8 to d34a1b2 Compare November 12, 2025 21:22
@alyssacgoins
Copy link
Contributor

/retest

@sduvvuri1603 sduvvuri1603 force-pushed the feature/pipeline-run-parallelism branch 7 times, most recently from 82756e1 to 60a35d8 Compare November 14, 2025 21:27
@google-oss-prow
Copy link

@sduvvuri1603: Cannot trigger testing until a trusted user reviews the PR and leaves an /ok-to-test message.

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@hbelmiro
Copy link
Contributor

/ok-to-test

@hbelmiro
Copy link
Contributor

/retest

@sduvvuri1603 sduvvuri1603 marked this pull request as ready for review November 17, 2025 17:06
@sduvvuri1603 sduvvuri1603 marked this pull request as draft November 17, 2025 17:06
subjects:
- kind: ServiceAccount
name: ml-pipeline
namespace: kubeflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional to hardcode the namespace, what if I deploy with a different namespace, is this getting replaced/overriden somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was not intentional.
Thanks for pointing that out! I’ll change this binding to use a namespace variable($(kfp-namespace)) so it follows whatever namespace users deploy into.

Copy link
Contributor

@nsingla nsingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a pipeline in test_data/valid/*compiled/essential that overrides this value.

@sduvvuri1603 sduvvuri1603 force-pushed the feature/pipeline-run-parallelism branch from 60a35d8 to 39fb3dd Compare November 19, 2025 21:24
@sduvvuri1603 sduvvuri1603 force-pushed the feature/pipeline-run-parallelism branch from 39fb3dd to c587b03 Compare November 19, 2025 21:32

Consult the [Python SDK reference docs](https://kubeflow-pipelines.readthedocs.io/en/stable/) when writing pipelines using the Python SDK.

> New in master: `dsl.PipelineConfig` now accepts an optional `pipeline_run_parallelism` integer to cap concurrent task execution for a run. The backend stores the requested limit in a shared ConfigMap and surfaces it to Argo Workflows via `spec.parallelism`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be more appropriate to add this entry to the CHANGELOG.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but will this be a part of a new section called "Unreleased Features" ? because I only see version release details in the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the PR will be included here as part of the release process. @mprahl , could you confirm if that’s correct?


@pipeline_run_parallelism.setter
def pipeline_run_parallelism(self, value: Optional[int]) -> None: # pylint: disable=attribute-defined-outside-init
if value is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's right about the serialization part. But we need this guard to prevent a crash during initialization. Since init passes None by default, removing this check would cause it to hit the isinstance line and fail immediately. This just ensures we can safely create the object with no value set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess then you can just add 1 if statment:

        if value:
          if not isinstance(value, int):
              raise ValueError(
                  'pipeline_run_parallelism must be an integer if specified.')
          if value <= 0:
              raise ValueError(
                  'pipeline_run_parallelism must be a positive integer.')
       self._pipeline_run_parallelism = value

name="pipeline-with-workspace",
description="A pipeline that demonstrates workspace functionality",
pipeline_config=dsl.PipelineConfig(
pipeline_run_parallelism=3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we actually set this to None here if we have an explicit test to test +ve values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, That makes sense. Will set this to none

@nsingla
Copy link
Contributor

nsingla commented Nov 21, 2025

@sduvvuri1603 can you please add what this config is suppose to do, to the PR description? and a section about how you;ve validated the functionality.


@dsl.pipeline(
name='pipeline-with-run-parallelism',
pipeline_config=dsl.PipelineConfig(pipeline_run_parallelism=7),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't 7 too high when the number of tasks in this pipeline is just 1? May be you should add more components to it or add a parallelFor loop and iterate over > pipeline_run_parallelism constants, so that we can validate that the config actually works.
Also what validation logic did you add to confirm the number of tasks created for a pipeline with this config?

Copy link
Contributor Author

@sduvvuri1603 sduvvuri1603 Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific test case is part of the SDK compilation suite to verify that the pipeline_run_parallelism field is correctly serialized from the Python SDK into the compiled YAML's PlatformSpec. It relies on the 'Golden File' comparison for validation here (ensuring the YAML contains pipelineRunParallelism: 7 correctly populated)

(It is not related to actual runtime limit covered by the backend integration tests where we submit these workflows to Argo is my understanding) Pls Lmk if this is correct!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any pipeline yaml file in this directory will be part of the end to end tests, so yes, the workflow will get submitted to argo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants