Skip to content

Commit 99f2fc8

Browse files
committed
feat: add pipeline run parallelism config
Signed-off-by: sduvvuri1603 <[email protected]>
1 parent f20abd4 commit 99f2fc8

File tree

19 files changed

+263
-17
lines changed

19 files changed

+263
-17
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ See the Kubeflow [Pipelines API doc](https://www.kubeflow.org/docs/components/pi
4242

4343
Consult the [Python SDK reference docs](https://kubeflow-pipelines.readthedocs.io/en/stable/) when writing pipelines using the Python SDK.
4444

45+
> New in master: `dsl.PipelineConfig` now accepts an optional `pipeline_run_parallelism` integer to cap concurrent task execution for a run. The backend stores the requested limit in a shared ConfigMap and surfaces it to Argo Workflows via `spec.parallelism`.
46+
4547
## Deep Wiki
4648
Check out our AI Powered repo documentation on [DeepWiki](https://deepwiki.com/kubeflow/pipelines).
4749

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 18 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/pipeline_spec.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,4 +1173,8 @@ message PipelineConfig {
11731173
// Configuration for a shared storage workspace that persists for the duration of the pipeline run.
11741174
// The workspace can be configured with size and Kubernetes-specific settings to override default PVC configurations.
11751175
optional WorkspaceConfig workspace = 2;
1176+
1177+
// Maximum number of tasks that can be scheduled simultaneously for a single
1178+
// pipeline run.
1179+
optional int32 pipeline_run_parallelism = 3;
11761180
}

backend/src/apiserver/client/kubernetes_core.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
type KubernetesCoreInterface interface {
1313
PodClient(namespace string) v1.PodInterface
14+
ConfigMapClient(namespace string) v1.ConfigMapInterface
1415
}
1516

1617
type KubernetesCore struct {
@@ -21,6 +22,10 @@ func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
2122
return c.coreV1Client.Pods(namespace)
2223
}
2324

25+
func (c *KubernetesCore) ConfigMapClient(namespace string) v1.ConfigMapInterface {
26+
return c.coreV1Client.ConfigMaps(namespace)
27+
}
28+
2429
func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) {
2530
clientSet, err := getKubernetesClientset(clientParams)
2631
if err != nil {

backend/src/apiserver/client/kubernetes_core_fake.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"github.com/kubeflow/pipelines/backend/src/common/util"
2121
policyv1 "k8s.io/api/policy/v1"
2222
policyv1beta1 "k8s.io/api/policy/v1beta1"
23+
kubernetesfake "k8s.io/client-go/kubernetes/fake"
2324
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
2425
)
2526

2627
type FakeKuberneteCoreClient struct {
2728
podClientFake *FakePodClient
29+
coreClient v1.CoreV1Interface
2830
}
2931

3032
func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
@@ -34,22 +36,39 @@ func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
3436
return c.podClientFake
3537
}
3638

39+
func (c *FakeKuberneteCoreClient) ConfigMapClient(namespace string) v1.ConfigMapInterface {
40+
return c.coreClient.ConfigMaps(namespace)
41+
}
42+
3743
func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient {
38-
return &FakeKuberneteCoreClient{&FakePodClient{}}
44+
clientset := kubernetesfake.NewSimpleClientset()
45+
return &FakeKuberneteCoreClient{
46+
podClientFake: &FakePodClient{},
47+
coreClient: clientset.CoreV1(),
48+
}
3949
}
4050

4151
type FakeKubernetesCoreClientWithBadPodClient struct {
4252
podClientFake *FakeBadPodClient
53+
coreClient v1.CoreV1Interface
4354
}
4455

4556
func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient {
46-
return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}}
57+
clientset := kubernetesfake.NewSimpleClientset()
58+
return &FakeKubernetesCoreClientWithBadPodClient{
59+
podClientFake: &FakeBadPodClient{},
60+
coreClient: clientset.CoreV1(),
61+
}
4762
}
4863

4964
func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface {
5065
return c.podClientFake
5166
}
5267

68+
func (c *FakeKubernetesCoreClientWithBadPodClient) ConfigMapClient(namespace string) v1.ConfigMapInterface {
69+
return c.coreClient.ConfigMaps(namespace)
70+
}
71+
5372
func (c *FakePodClient) EvictV1(context.Context, *policyv1.Eviction) error {
5473
return nil
5574
}

backend/src/apiserver/resource/resource_manager.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ import (
5050
utilerrors "k8s.io/apimachinery/pkg/util/errors"
5151
)
5252

53+
const pipelineParallelismConfigMapName = "kfp-pipeline-config"
54+
5355
// Metric variables. Please prefix the metric names with resource_manager_.
5456
var (
5557
extraLabels = []string{
@@ -153,6 +155,65 @@ func NewResourceManager(clientManager ClientManagerInterface, options *ResourceM
153155
}
154156
}
155157

158+
func workflowParallelism(executionSpec util.ExecutionSpec) (int64, bool) {
159+
if executionSpec == nil {
160+
return 0, false
161+
}
162+
wf, ok := executionSpec.(*util.Workflow)
163+
if !ok || wf == nil || wf.Spec.Parallelism == nil {
164+
return 0, false
165+
}
166+
return *wf.Spec.Parallelism, true
167+
}
168+
169+
func (r *ResourceManager) upsertPipelineParallelismConfigMap(ctx context.Context, namespace, key string, parallelism int64) error {
170+
if key == "" {
171+
return nil
172+
}
173+
174+
configMaps := r.k8sCoreClient.ConfigMapClient(namespace)
175+
value := strconv.FormatInt(parallelism, 10)
176+
177+
for {
178+
current, err := configMaps.Get(ctx, pipelineParallelismConfigMapName, v1.GetOptions{})
179+
if apierrors.IsNotFound(err) {
180+
newConfigMap := &corev1.ConfigMap{
181+
ObjectMeta: v1.ObjectMeta{
182+
Name: pipelineParallelismConfigMapName,
183+
Namespace: namespace,
184+
},
185+
Data: map[string]string{
186+
key: value,
187+
},
188+
}
189+
if _, err := configMaps.Create(ctx, newConfigMap, v1.CreateOptions{}); apierrors.IsAlreadyExists(err) {
190+
continue
191+
} else if err != nil {
192+
return util.Wrap(err, "failed to create pipeline parallelism ConfigMap entry")
193+
}
194+
return nil
195+
}
196+
if err != nil {
197+
return util.Wrap(err, "failed to retrieve pipeline parallelism ConfigMap")
198+
}
199+
200+
if current.Data == nil {
201+
current.Data = map[string]string{}
202+
}
203+
if existing, ok := current.Data[key]; ok && existing == value {
204+
return nil
205+
}
206+
207+
current.Data[key] = value
208+
if _, err := configMaps.Update(ctx, current, v1.UpdateOptions{}); apierrors.IsConflict(err) {
209+
continue
210+
} else if err != nil {
211+
return util.Wrap(err, "failed to update pipeline parallelism ConfigMap entry")
212+
}
213+
return nil
214+
}
215+
}
216+
156217
func (r *ResourceManager) getWorkflowClient(namespace string) util.ExecutionInterface {
157218
return r.execClient.Execution(namespace)
158219
}
@@ -562,6 +623,13 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
562623
}
563624
executionSpec.SetExecutionNamespace(k8sNamespace)
564625

626+
if parallelism, ok := workflowParallelism(executionSpec); ok && parallelism > 0 && run.PipelineSpec.PipelineVersionId != "" {
627+
if err := r.upsertPipelineParallelismConfigMap(
628+
ctx, k8sNamespace, run.PipelineSpec.PipelineVersionId, parallelism); err != nil {
629+
return nil, util.Wrap(err, "Failed to persist pipeline_run_parallelism configuration")
630+
}
631+
}
632+
565633
// assign OwnerReference to scheduledworkflow
566634
if run.RecurringRunId != "" {
567635
job, err := r.jobStore.GetJob(run.RecurringRunId)

backend/src/apiserver/resource/resource_manager_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,66 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) {
19291929
assert.Equal(t, expectedRunDetail.ToV1(), runDetail.ToV1(), "CreateRun stored invalid data in database")
19301930
}
19311931

1932+
func TestCreateRun_PipelineRunParallelismConfigMap(t *testing.T) {
1933+
initEnvVars()
1934+
1935+
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
1936+
defer store.Close()
1937+
1938+
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})
1939+
1940+
pipeline, err := manager.CreatePipeline(createPipeline("parallelism-pipeline", "", "kubeflow"))
1941+
assert.NoError(t, err)
1942+
1943+
pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore)
1944+
assert.True(t, ok)
1945+
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil))
1946+
1947+
pv := createPipelineVersion(
1948+
pipeline.UUID,
1949+
"parallelism-version",
1950+
"",
1951+
"",
1952+
v2SpecHelloWorldWithParallelism,
1953+
"",
1954+
"kubeflow",
1955+
)
1956+
version, err := manager.CreatePipelineVersion(pv)
1957+
assert.NoError(t, err)
1958+
1959+
run := &model.Run{
1960+
DisplayName: "run-with-parallelism",
1961+
Namespace: "kubeflow",
1962+
PipelineSpec: model.PipelineSpec{
1963+
PipelineId: pipeline.UUID,
1964+
PipelineVersionId: version.UUID,
1965+
RuntimeConfig: model.RuntimeConfig{
1966+
Parameters: "{\"text\":\"hello\"}",
1967+
},
1968+
},
1969+
}
1970+
1971+
createdRun, err := manager.CreateRun(context.Background(), run)
1972+
assert.NoError(t, err)
1973+
assert.NotNil(t, createdRun)
1974+
1975+
configMap, err := store.k8sCoreClientFake.ConfigMapClient("kubeflow").Get(
1976+
context.Background(), pipelineParallelismConfigMapName, v1.GetOptions{})
1977+
assert.NoError(t, err)
1978+
if assert.NotNil(t, configMap.Data) {
1979+
assert.Equal(t, "5", configMap.Data[version.UUID])
1980+
}
1981+
1982+
execSpec, err := store.ExecClientFake.Execution("kubeflow").Get(
1983+
context.Background(), createdRun.K8SName, v1.GetOptions{})
1984+
assert.NoError(t, err)
1985+
workflow, ok := execSpec.(*util.Workflow)
1986+
assert.True(t, ok)
1987+
if assert.NotNil(t, workflow.Spec.Parallelism) {
1988+
assert.Equal(t, int64(5), *workflow.Spec.Parallelism)
1989+
}
1990+
}
1991+
19321992
func TestCreateRun_ThroughPipelineIdAndPipelineVersion(t *testing.T) {
19331993
// Create experiment, pipeline, and pipeline version.
19341994
store, manager, experiment, pipeline, _ := initWithExperimentAndPipeline(t)
@@ -4138,6 +4198,14 @@ schemaVersion: 2.1.0
41384198
sdkVersion: kfp-1.6.5
41394199
`
41404200

4201+
var v2SpecHelloWorldWithParallelism = v2SpecHelloWorld + `
4202+
---
4203+
platforms:
4204+
kubernetes:
4205+
pipelineConfig:
4206+
pipelineRunParallelism: 5
4207+
`
4208+
41414209
var v2SpecHelloWorldMutated = `
41424210
components:
41434211
comp-hello-world:

backend/src/cache/client/kubernetes_core.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
type KubernetesCoreInterface interface {
1515
PodClient(namespace string) v1.PodInterface
16+
ConfigMapClient(namespace string) v1.ConfigMapInterface
1617
}
1718

1819
type KubernetesCore struct {
@@ -23,6 +24,10 @@ func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
2324
return c.coreV1Client.Pods(namespace)
2425
}
2526

27+
func (c *KubernetesCore) ConfigMapClient(namespace string) v1.ConfigMapInterface {
28+
return c.coreV1Client.ConfigMaps(namespace)
29+
}
30+
2631
func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) {
2732
restConfig, err := util.GetKubernetesConfig()
2833
if err != nil {

backend/src/cache/client/kubernetes_core_fake.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"github.com/kubeflow/pipelines/backend/src/common/util"
2121
policyv1 "k8s.io/api/policy/v1"
2222
policyv1beta1 "k8s.io/api/policy/v1beta1"
23+
kubernetesfake "k8s.io/client-go/kubernetes/fake"
2324
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
2425
)
2526

2627
type FakeKuberneteCoreClient struct {
2728
podClientFake *FakePodClient
29+
coreClient v1.CoreV1Interface
2830
}
2931

3032
func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
@@ -34,22 +36,39 @@ func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
3436
return c.podClientFake
3537
}
3638

39+
func (c *FakeKuberneteCoreClient) ConfigMapClient(namespace string) v1.ConfigMapInterface {
40+
return c.coreClient.ConfigMaps(namespace)
41+
}
42+
3743
func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient {
38-
return &FakeKuberneteCoreClient{&FakePodClient{}}
44+
clientset := kubernetesfake.NewSimpleClientset()
45+
return &FakeKuberneteCoreClient{
46+
podClientFake: &FakePodClient{},
47+
coreClient: clientset.CoreV1(),
48+
}
3949
}
4050

4151
type FakeKubernetesCoreClientWithBadPodClient struct {
4252
podClientFake *FakeBadPodClient
53+
coreClient v1.CoreV1Interface
4354
}
4455

4556
func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient {
46-
return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}}
57+
clientset := kubernetesfake.NewSimpleClientset()
58+
return &FakeKubernetesCoreClientWithBadPodClient{
59+
podClientFake: &FakeBadPodClient{},
60+
coreClient: clientset.CoreV1(),
61+
}
4762
}
4863

4964
func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface {
5065
return c.podClientFake
5166
}
5267

68+
func (c *FakeKubernetesCoreClientWithBadPodClient) ConfigMapClient(namespace string) v1.ConfigMapInterface {
69+
return c.coreClient.ConfigMaps(namespace)
70+
}
71+
5372
func (c *FakePodClient) EvictV1(context.Context, *policyv1.Eviction) error {
5473
return nil
5574
}

0 commit comments

Comments
 (0)