Skip to content

Conversation

ddl-ebrown
Copy link
Contributor

@ddl-ebrown ddl-ebrown commented Jun 14, 2024

@ddl-rliu did most of the work on this one - making this an upstream PR as it resolved a real issue for us.

Tracking issue

Why are the changes needed?

  • Typically Flyte is configured so that each project / domain has its
    own Kubernetes namespace.

    Certain environments may change this behavior by using the Flyteadmin
    namespace_mapping setting to put all executions in fewer (or a singular)
    Kubernetes namespace. This is problematic because it can lead to
    collisions in the naming of the CR that flyteadmin generates.

What changes were proposed in this pull request?

  • This patch fixes 2 important things to make this work properly inside
    of Flyte:

    • it adds a random element to the CR name in Flyte so that the CR is
      named by the execution + some unique value when created by
      flyteadmin

      Without this change, an execution Foo in project A will prevent an
      execution Foo in project B from launching, because the name of the
      CR thats generated in Kubernetes assumes that the namespace the
      CRs are put into is different for project A and project B

      When namespace_mapping is set to a singular value, that assumption
      is wrong

    • it makes sure that when flytepropeller cleans up the CR resource
      that it uses Kubernetes labels to find the correct CR -- so instead
      of assuming that it can use the execution name, it instead uses the
      project, domain and execution labels

How was this patch tested?

This is deployed in a live Flyte setup where we have automated tests. We observed that the CR names were correctly unique after this and the initial collision no longer occurred.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This PR revamps Flyte's CR naming system to support namespace mapping by implementing label selector-based deletion logic and adding randomized hash-based suffixes to CR names. The changes prevent collisions when multiple executions share a Kubernetes namespace and include configuration options to toggle this behavior. Tests have been updated to validate both the new deletion logic and naming scheme.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 2

@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch from 806c40b to 438dd97 Compare June 14, 2024 22:39
Copy link

codecov bot commented Jun 14, 2024

Codecov Report

Attention: Patch coverage is 84.61538% with 6 lines in your changes missing coverage. Please review.

Project coverage is 58.49%. Comparing base (c3869f8) to head (318435c).

Files with missing lines Patch % Lines
...ropeller/pkg/compiler/transformers/k8s/workflow.go 75.00% 5 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5480      +/-   ##
==========================================
+ Coverage   58.47%   58.49%   +0.02%     
==========================================
  Files         940      940              
  Lines       71567    71602      +35     
==========================================
+ Hits        41852    41887      +35     
+ Misses      26531    26530       -1     
- Partials     3184     3185       +1     
Flag Coverage Δ
unittests-datacatalog 59.03% <ø> (ø)
unittests-flyteadmin 56.31% <100.00%> (+0.05%) ⬆️
unittests-flytecopilot 30.99% <ø> (ø)
unittests-flytectl 64.70% <ø> (ø)
unittests-flyteidl 76.12% <ø> (ø)
unittests-flyteplugins 60.89% <ø> (ø)
unittests-flytepropeller 54.84% <75.00%> (+0.02%) ⬆️
unittests-flytestdlib 64.04% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch from 438dd97 to 532888b Compare June 14, 2024 22:44
ctx,
v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},
v1.ListOptions{
LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though new executions will have different CR names, this deletion mechanism is fully backwards compatible - thanks for a good solution @ddl-rliu !

@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch 2 times, most recently from 33e6e7d to f97c77a Compare June 14, 2024 23:27
@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch from f97c77a to d994304 Compare June 14, 2024 23:52
Name: "name",
},
"namespace")
// make sure real CR has randomized suffix
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the simplest way to ensure existing tests continue to pass

rand.Seed(seed)
// K8s has a limitation of 63 chars
name = name[:minInt(63-ExecutionIDSuffixLength, len(name))]
execName := name + "-" + rand.String(ExecutionIDSuffixLength-1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this randomization, use of namespace_mapping in the config

const (
namespaceMappingKey = "namespace_mapping"
defaultTemplate = "{{ project }}-{{ domain }}"
)
var namespaceMappingConfig = config.MustRegisterSection(namespaceMappingKey, &interfaces.NamespaceMappingConfig{
Template: defaultTemplate,
})
with a value like foo causes problems when executions have the same names across projects

@davidmirror-ops davidmirror-ops requested a review from hamersaw June 17, 2024 12:21
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

}
}

const ExecutionIDSuffixLength = 21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a configurable value and if set to 0 (default?) then have it disabled (so no random characters are appended). My concern is that if anything relies on the CR name, this will break it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we found / updated the spots where the name is a "contract" -- but if we want to be extra super careful we could make this configurable.

That said, I think the tradeoff we have to consider is:

  • backward compatibility vs
  • adding extra config / managing different behaviors

I would probably vote for not introducing an extra config and keeping this behavior not configurable (I'd argue prior behavior was a bug), but admit to not knowing the potential blast radius beyond core Flyte (i.e. plugins and such).

I'm happy to go either way since it's not my project :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand the config bloat all to well :). As you suggest, my main concern is breaking backwards compatibility here. I know there are Flyte users that rely on the FlyteWorkflow CR to be named identical to the execution ID, which this would break. For me to be comfortable merging this, I feel it should be defaulted to the current behavior. cc @eapolinario thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks!

If you know there are users depending on the existing CR naming scheme somehow, then making this behavior configurable seems like the only thing to do right now. I can update my PR.

Not sure if you track potential breaking change tickets anywhere, but maybe file one away to remove the option to configure this on the next major release boundary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I've updated the PR to address this and explain a bit more here: #5480 (comment)

@kumare3
Copy link
Contributor

kumare3 commented Jun 25, 2024

I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy

@ddl-ebrown
Copy link
Contributor Author

I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy

Ah thanks @kumare3 for the heads up! We clearly didn't realize there was something internal to Flyte that depends on deterministic naming for CRs -- will make some updates taking that into account as well

@ddl-ebrown
Copy link
Contributor Author

I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy

Ah thanks @kumare3 for the heads up! We clearly didn't realize there was something internal to Flyte that depends on deterministic naming for CRs -- will make some updates taking that into account as well

Also, should mention @kumare3 that if by "leaky" you meant "CR might not be deleted from the cluster", the deletion process is robust because this uses the actual key of the workflow in conjunction with CR labels to perform deletes, rather than the CR name.

If there are dupe CRs for the same workflow though, that's clearly an issue regardless :)

@EngHabu
Copy link
Contributor

EngHabu commented Jun 25, 2024

@ddl-ebrown I agree with not introducing randomization... specially that the name already starts with a random string :-)

Instead, I would update this call to use something like project-domain-rand(10) and hash that and that becomes the execution name...

I would also make the length of the execution name configurable in flyteadmin. so in your deployment you can make it longer and give you better entropy...

@ddl-rliu ddl-rliu force-pushed the change-flyte-CR-naming-scheme branch 6 times, most recently from 33a99f0 to 93bb9a5 Compare July 18, 2024 22:46
Comment on lines 462 to 461
}
// make sure real CR has randomized suffix
assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name)
// then reset for the sake of serialization comparison
flyteWf.Name = "name"

Copy link
Contributor

@ddl-rliu ddl-rliu Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cr-name-scheme-change is now non-default, and tests are added here, so removing these checks.

Comment on lines 252 to 257
hashedIdentifier := hashIdentifier(core.Identifier{
Project: project,
Domain: domain,
Name: name,
})
rand.Seed(int64(hashedIdentifier))
Copy link
Contributor

@ddl-rliu ddl-rliu Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By hashing exactly these fields project, domain, name we prevent duplicate CRs from being created for the same execution (they will hash to the same CR name and k8s will prevent those duplicate CRs).

Edit: Regarding the hashIdentifier function, see this for reference.

Comment on lines 259 to 268
if workflowCRNameHashLength := config.GetConfig().WorkflowCRNameHashLength; workflowCRNameHashLength > 0 {
obj.ObjectMeta.Name = rand.String(workflowCRNameHashLength)
} else {
obj.ObjectMeta.Name = name
}
Copy link
Contributor

@ddl-rliu ddl-rliu Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default WorkflowCRNameHashLength is 0 so the hash CR naming scheme will only be used when the deployment is specially configured to do so. See 93bb9a5#diff-67941e1320bd66fda570ebcf1f1c5c5221d608b136d662705e168a07a5d166ef

rand.Seed(int64(hashedIdentifier))

if workflowCRNameHashLength := config.GetConfig().WorkflowCRNameHashLength; workflowCRNameHashLength > 0 {
obj.ObjectMeta.Name = rand.String(workflowCRNameHashLength)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the name of the CR just incorporate the k8s namespace though and still be stable? I think using just a hash for the CR name loses some valuable information when you're glancing at the resource names now.

Copy link
Contributor

@ddl-rliu ddl-rliu Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated #5480 (comment)

Now, for a setting like WorkflowCRNameHashLength: 32, the CR name will look like mywfname-f5ptmmd66pktp59nphfdjkqr9szp8fv5 or mywfname-wnw9zlg22z5wmdjggdcrnzdrpw59m4bc. Note that the suffix is deterministic, and uses a hash of the name+project+domain.

I opted for this approach rather than mywfname-project, this is because of the character limit of the CR name and how this affects executions that have a long execution name and a long project name, where improperly handled truncation will lead to undesired behavior such as executions not starting as a result of name conflicts. For reference.

@ddl-rliu ddl-rliu force-pushed the change-flyte-CR-naming-scheme branch 2 times, most recently from f46550d to d4a0e72 Compare July 23, 2024 18:02
Comment on lines 264 to 269
base := name + "-"
maxNameLength := allowedExecutionNameLength - workflowCRNameHashLength
if len(base) > maxNameLength {
base = base[:maxNameLength]
}
obj.ObjectMeta.Name = fmt.Sprintf("%s%s", base, rand.String(workflowCRNameHashLength))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to bikeshed too much on this one, but a couple things...

You can skip some of that slice logic around length if you just use format specifiers to set the "max"

maxNameLength := allowedExecutionNameLength - workflowCRNameHashLength - 1
if s, err := strconv.Atoi(maxNameLength); err == nil {
    format := "[%." + s+ "s]-%s"
    obj.ObjectMeta.Name = fmt.Sprintf(format, base, rand.String(workflowCRNameHashLength))
}

Even better -- does WorkflowCRNameHashLength really need to be configurable or can it just be on / off? If it's sufficiently long enough to prevent collisions (and I think we can pick something), then you could simplify this length calculation logic and just limit the string statically using a format specifier like this (let's assume its set to 24, for example):

maxBase := allowedExecutionNameLength - workflowCRNameHashLength - 1
obj.ObjectMeta.Name = fmt.Sprintf("[%.38s]-%s", base, rand.String(workflowCRNameHashLength))

Copy link
Contributor Author

@ddl-ebrown ddl-ebrown Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The k8s rand util you're using has 32 possible characters... I think once you hit a string of length 24 you're in UUID territory (which is 5.31x10^36 and generally considered more than anyone would ever need)

I don't think something that long is remotely necessary for this use case though - the primary concern is that the seeded random number generator doesn't produce a duplicate random string given two different seeds (i.e. for an execution with the same name and domain, but in a different project). For that, even something like 10 characters should be enough I think, which is why I would advocate for picking something sensible. I know maintainers suggested making it configurable, but that feels unnecessary IMHO.

32^10 is 1.13x10^15

Using the collision calculator at https://devina.io/collision-calculator to understand this in more human-friendly terms

~1 year (or 3.43e+7 seconds) needed, in order to have a 1% probability of at least one collision if 500 ID's are generated every hour.

Copy link
Contributor

@ddl-rliu ddl-rliu Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fixed suffix length of 10 sounds good to me. Agreed about the sufficient amount of entropy on relatively high values for the suffix length. Just updated the PR: 15b57cb

@ddl-rliu ddl-rliu force-pushed the change-flyte-CR-naming-scheme branch 2 times, most recently from 70accda to 15b57cb Compare July 25, 2024 23:22
@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch from 15b57cb to a268b61 Compare August 28, 2024 16:38
@davidmirror-ops davidmirror-ops requested a review from pvditt October 3, 2024 16:50
@eapolinario
Copy link
Contributor

Cleaning stale PRs. Please reopen if you wan to discuss this further.

@eapolinario eapolinario closed this Mar 3, 2025
@ddl-ebrown
Copy link
Contributor Author

ddl-ebrown commented Mar 4, 2025

FYI - We're still carrying a patch around this as the it's necessary to fix the underlying bug inside of Flyte. I would love to see this resolved upstream so that we don't need to carry the patch.

What do we think the path is to making that happen @eapolinario?

From deeaf0e25bf67d382fe0080afc4832de4970a084 Mon Sep 17 00:00:00 2001
From: Richard Liu <[email protected]>
Date: Thu, 13 Jun 2024 21:51:40 -0700
Subject: [PATCH] Change Flyte CR naming scheme to better support
 namespace_mapping

 - Typically Flyte is configured so that each project / domain has its
   own Kubernetes namespace.

   Certain environments may change this behavior by using the Flyteadmin
   namespace_mapping setting to put all executions in fewer (or a singular)
   Kubernetes namespace. This is problematic because it can lead to
   collisions in the naming of the CR that flyteadmin generates.

 - This patch fixes 2 important things to make this work properly inside
   of Flyte:

   * it adds a random element to the CR name in Flyte so that the CR is
     named by the execution + some unique value when created by
     flyteadmin

     Without this change, an execution Foo in project A will prevent an
     execution Foo in project B from launching, because the name of the
     CR thats generated in Kubernetes *assumes* that the namespace the
     CRs are put into is different for project A and project B

     When namespace_mapping is set to a singular value, that assumption
     is wrong

   * it makes sure that when flytepropeller cleans up the CR resource
     that it uses Kubernetes labels to find the correct CR -- so instead
     of assuming that it can use the execution name, it instead uses the
     project, domain and execution labels

 - Adds use-workflow-cr-name-suffix setting as a true / false value to
   determine whether to append a deterministic hash of execution id, name,
   project, as the FlyteWorkflow CR name. This uses a fixed length of 10
   which introduces enough entropy to prevent collisions in practical
   terms

Signed-off-by: ddl-ebrown <[email protected]>
Signed-off-by: ddl-rliu <[email protected]>
---
 .../pkg/workflowengine/impl/k8s_executor.go   | 28 +++++++++++--
 .../workflowengine/impl/k8s_executor_test.go  | 27 +++++++-----
 .../pkg/compiler/transformers/k8s/workflow.go | 37 +++++++++++++++-
 .../transformers/k8s/workflow_test.go         | 42 +++++++++++++++++++
 .../pkg/controller/config/config.go           |  2 +
 5 files changed, 122 insertions(+), 14 deletions(-)

diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
index d941cc830..c7a73e983 100644
--- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
+++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
@@ -12,6 +12,7 @@ import (
 	execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
 	runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
 	"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
+	"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flyte/flytestdlib/logger"
 	"github.com/flyteorg/flyte/flytestdlib/storage"
 )
@@ -87,6 +88,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
 	}, nil
 }
 
+const (
+	// Labels that are set on the FlyteWorkflow CRD
+	DomainLabel      = "domain"
+	ExecutionIDLabel = "execution-id"
+	ProjectLabel     = "project"
+)
+
+func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector {
+	return &v1.LabelSelector{
+		MatchLabels: map[string]string{
+			DomainLabel:      executionID.GetDomain(),
+			ExecutionIDLabel: executionID.GetName(),
+			ProjectLabel:     executionID.GetProject(),
+		},
+	}
+}
+
 func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error {
 	target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{
 		TargetID: data.Cluster,
@@ -94,9 +112,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
 	if err != nil {
 		return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
 	}
-	err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
-		PropagationPolicy: &deletePropagationBackground,
-	})
+	err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection(
+		ctx,
+		v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},
+		v1.ListOptions{
+			LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
+		},
+	)
 	// An IsNotFound error indicates the resource is already deleted.
 	if err != nil && !k8_api_err.IsNotFound(err) {
 		return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
index a2ecb5136..bc3888667 100644
--- a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
+++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
@@ -31,11 +31,11 @@ import (
 var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}
 
 type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
-type deleteCallback func(name string, options *v1.DeleteOptions) error
+type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error
 type FakeFlyteWorkflow struct {
 	v1alpha12.FlyteWorkflowInterface
-	createCallback createCallback
-	deleteCallback deleteCallback
+	createCallback
+	deleteCollectionCallback
 }
 
 func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
@@ -45,9 +45,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl
 	return nil, nil
 }
 
-func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error {
-	if b.deleteCallback != nil {
-		return b.deleteCallback(name, &options)
+func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
+	if b.deleteCollectionCallback != nil {
+		return b.deleteCollectionCallback(&opts, &listOpts)
 	}
 	return nil
 }
@@ -280,8 +280,15 @@ func TestExecute_MiscError(t *testing.T) {
 
 func TestAbort(t *testing.T) {
 	fakeFlyteWorkflow := FakeFlyteWorkflow{}
-	fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
-		assert.Equal(t, execID.Name, name)
+	fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error {
+		selector := v1.FormatLabelSelector(&v1.LabelSelector{
+			MatchLabels: map[string]string{
+				DomainLabel:      execID.GetDomain(),
+				ExecutionIDLabel: execID.GetName(),
+				ProjectLabel:     execID.GetProject(),
+			},
+		})
+		assert.Equal(t, selector, listOpts.LabelSelector)
 		assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
 		return nil
 	}
@@ -302,7 +309,7 @@ func TestAbort(t *testing.T) {
 
 func TestAbort_Notfound(t *testing.T) {
 	fakeFlyteWorkflow := FakeFlyteWorkflow{}
-	fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
+	fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
 		return k8_api_err.NewNotFound(schema.GroupResource{
 			Group:    "foo",
 			Resource: "bar",
@@ -325,7 +332,7 @@ func TestAbort_Notfound(t *testing.T) {
 
 func TestAbort_MiscError(t *testing.T) {
 	fakeFlyteWorkflow := FakeFlyteWorkflow{}
-	fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
+	fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
 		return errors.New("call failed")
 	}
 	fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
index 2421ddf9b..73e518c13 100644
--- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
+++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
@@ -2,17 +2,21 @@
 package k8s
 
 import (
+	"context"
 	"fmt"
 	"hash/fnv"
 	"strings"
 
 	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/util/rand"
 
 	"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
+	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
+	"github.com/flyteorg/flyte/flytestdlib/logger"
 )
 
 const (
@@ -30,6 +34,11 @@ const (
 	ShardKeyLabel = "shard-key"
 	// The fully qualified FlyteWorkflow name
 	WorkflowNameLabel = "workflow-name"
+
+	// Length of hash to use as a suffix on the workflow CR name. Used when config.UseWorkflowCRNameSuffix is true.
+	// The workflow CR name should be at or under 63 characters long, here it is 52 + 1 + 10 = 63
+	workflowCRNameHashLength = 10
+	workflowCRNameSuffixFmt  = "%.52s-%s"
 )
 
 func requiresInputs(w *core.WorkflowTemplate) bool {
@@ -159,6 +168,20 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
 	}
 }
 
+func hashIdentifier(identifier core.Identifier) uint64 {
+	h := fnv.New64()
+	_, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s",
+		identifier.Project, identifier.Domain, identifier.Name)))
+	if err != nil {
+		// This shouldn't occur.
+		logger.Errorf(context.Background(),
+			"failed to hash execution identifier: %+v with err: %v", identifier, err)
+		return 0
+	}
+	logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64())
+	return h.Sum64()
+}
+
 // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
 func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap,
 	executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) {
@@ -231,7 +254,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
 		errs.Collect(errors.NewWorkflowBuildError(err))
 	}
 
-	obj.ObjectMeta.Name = name
+	if config.GetConfig().UseWorkflowCRNameSuffix {
+		// Seed the randomness before generating the name with random suffix
+		hashedIdentifier := hashIdentifier(core.Identifier{
+			Project: project,
+			Domain:  domain,
+			Name:    name,
+		})
+		rand.Seed(int64(hashedIdentifier))
+		obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength))
+	} else {
+		obj.ObjectMeta.Name = name
+	}
+
 	obj.ObjectMeta.GenerateName = generatedName
 	obj.ObjectMeta.Labels[ExecutionIDLabel] = label
 	obj.ObjectMeta.Labels[ProjectLabel] = project
diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
index dbb51e25e..893f81119 100644
--- a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
+++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
@@ -11,6 +11,7 @@ import (
 	"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
+	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
 	"github.com/flyteorg/flyte/flytestdlib/utils"
 )
 
@@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) {
 	assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue())
 }
 
+func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) {
+	for name, tt := range map[string]struct {
+		useSuffix bool
+		expected  string
+	}{
+		"default does not use hash as workflow CR name": {
+			useSuffix: false,
+			expected:  "",
+		},
+		"use hash as workflow CR name": {
+			useSuffix: true,
+			expected:  "-x6m7gswrdl",
+		},
+	} {
+		t.Run(name, func(t *testing.T) {
+			flyteConfig := config.GetConfig()
+			flyteConfig.UseWorkflowCRNameSuffix = tt.useSuffix
+
+			w := createSampleMockWorkflow()
+
+			errors.SetConfig(errors.Config{IncludeSource: true})
+			wf, err := BuildFlyteWorkflow(
+				&core.CompiledWorkflowClosure{
+					Primary: w.GetCoreWorkflow(),
+					Tasks: []*core.CompiledTask{
+						{
+							Template: &core.TaskTemplate{
+								Id: &core.Identifier{Name: "ref_1"},
+							},
+						},
+					},
+				},
+				nil, nil, "")
+			assert.Equal(t, tt.expected, wf.ObjectMeta.Name)
+			assert.NoError(t, err)
+			assert.NotNil(t, wf)
+			errors.SetConfig(errors.Config{})
+		})
+	}
+}
+
 func TestGenerateName(t *testing.T) {
 	t.Run("Invalid params", func(t *testing.T) {
 		_, _, _, _, _, err := generateName(nil, nil)
diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go
index a0217e186..7700bc873 100644
--- a/flytepropeller/pkg/controller/config/config.go
+++ b/flytepropeller/pkg/controller/config/config.go
@@ -120,6 +120,7 @@ var (
 			EventVersion:               0,
 			DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
 		},
+		UseWorkflowCRNameSuffix: false,
 	}
 )
 
@@ -161,6 +162,7 @@ type Config struct {
 	NodeExecutionWorkerCount int                     `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
 	ArrayNode                ArrayNodeConfig         `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
	LiteralOffloadingConfig  LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."`
+	UseWorkflowCRNameSuffix  bool                    `json:"use-workflow-cr-name-suffix" pflag:",If false, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as a suffix on the CR name."`
 }
 
 // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
-- 
2.45.2

@ddl-ebrown ddl-ebrown reopened this Mar 4, 2025
@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch from a268b61 to deeaf0e Compare March 20, 2025 16:31
 - Typically Flyte is configured so that each project / domain has its
   own Kubernetes namespace.

   Certain environments may change this behavior by using the Flyteadmin
   namespace_mapping setting to put all executions in fewer (or a singular)
   Kubernetes namespace. This is problematic because it can lead to
   collisions in the naming of the CR that flyteadmin generates.

 - This patch fixes 2 important things to make this work properly inside
   of Flyte:

   * it adds a random element to the CR name in Flyte so that the CR is
     named by the execution + some unique value when created by
     flyteadmin

     Without this change, an execution Foo in project A will prevent an
     execution Foo in project B from launching, because the name of the
     CR thats generated in Kubernetes *assumes* that the namespace the
     CRs are put into is different for project A and project B

     When namespace_mapping is set to a singular value, that assumption
     is wrong

   * it makes sure that when flytepropeller cleans up the CR resource
     that it uses Kubernetes labels to find the correct CR -- so instead
     of assuming that it can use the execution name, it instead uses the
     project, domain and execution labels

 - Adds use-workflow-cr-name-suffix setting as a true / false value to
   determine whether to append a deterministic hash of execution id, name,
   project, as the FlyteWorkflow CR name. This uses a fixed length of 10
   which introduces enough entropy to prevent collisions in practical
   terms

Signed-off-by: ddl-ebrown <[email protected]>
Signed-off-by: ddl-rliu <[email protected]>
@ddl-ebrown ddl-ebrown force-pushed the change-flyte-CR-naming-scheme branch from deeaf0e to 318435c Compare March 20, 2025 16:38
@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 20, 2025

Code Review Agent Run #547877

Actionable Suggestions - 1
  • flytepropeller/pkg/compiler/transformers/k8s/workflow.go - 1
    • Consider using thread-safe random generation · Line 264-265
Review Details
  • Files reviewed - 5 · Commit Range: 318435c..318435c
    • flyteadmin/pkg/workflowengine/impl/k8s_executor.go
    • flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
    • flytepropeller/pkg/compiler/transformers/k8s/workflow.go
    • flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
    • flytepropeller/pkg/controller/config/config.go
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful

Bito Usage Guide

Commands

Type the following command in the pull request comment and save the comment.

  • /review - Manually triggers a full AI review.

Refer to the documentation for additional commands.

Configuration

This repository uses code_review_bito You can customize the agent settings here or contact your Bito workspace admin at [email protected].

Documentation & Help

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Collaborator

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - Correct CR Deletion with Label Selector

k8s_executor.go - Updated deletion logic in Abort to call DeleteCollection using a label selector, ensuring proper cleanup of FlyteWorkflows in shared namespaces.

Feature Improvement - Enhanced Workflow CR Naming Scheme

workflow.go - Introduced a hash-based random suffix during workflow CR name generation to avoid naming collisions.

config.go - Added a configuration flag (UseWorkflowCRNameSuffix) to toggle the use of a hashed suffix for CR names based on the execution identifier.

Testing - Tests Adjustments for CR Naming and Deletion

k8s_executor_test.go - Modified test callback types and assertions to support the new DeleteCollection API for CR deletion.

workflow_test.go - Implemented tests to verify behavior of the workflow CR naming logic under different configuration settings.

Comment on lines +264 to +265
rand.Seed(int64(hashedIdentifier))
obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using thread-safe random generation

The code is using rand.Seed() with a hash of the identifier, but this approach has a potential issue. The rand.Seed() function is not concurrency-safe and could lead to race conditions if multiple workflows are being processed concurrently. Consider using a separate random number generator instance with rand.New(rand.NewSource(int64(hashedIdentifier))) instead of the global random number generator.

Code suggestion
Check the AI-generated fix before applying
Suggested change
rand.Seed(int64(hashedIdentifier))
obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength))
// Create a separate random generator to avoid race conditions
rng := rand.New(rand.NewSource(int64(hashedIdentifier)))
obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rng.String(workflowCRNameHashLength))

Code Review Run #547877


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants