Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions boilerplate/flyte/end2end/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def schedule_workflow_groups(
non_succeeded_executions = []
for execution in executions:
if execution.closure.phase != WorkflowExecutionPhase.SUCCEEDED:
print(f"workflow is in {execution.closure.phase}")
non_succeeded_executions.append(execution)
# Report failing cases
if len(non_succeeded_executions) != 0:
Expand Down
8 changes: 8 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"fmt"
"os"
"runtime/pprof"
"sync/atomic"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -93,6 +94,7 @@
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
executionStats *workflowstore.ExecutionStatsMonitor
leader atomic.Bool
}

// Run either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -133,6 +135,7 @@

// Called from leader elector -if configured- to start running as the leader.
func (c *Controller) onStartedLeading(ctx context.Context) {
c.leader.Store(true)

Check warning on line 138 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L138

Added line #L138 was not covered by tests
backgroundCtx, cancelNow := context.WithCancel(ctx)
logger.Infof(ctx, "Acquired leader lease.")
go func() {
Expand All @@ -142,6 +145,7 @@
}()

<-backgroundCtx.Done()

Check warning on line 148 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L148

Added line #L148 was not covered by tests
logger.Infof(ctx, "Lost leader lease.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set c.leader to False here to prevent it from keep enqueuing workflow while it is not leader anymore?

cancelNow()
}
Expand All @@ -157,6 +161,10 @@
return
}
key := wf.GetK8sWorkflowID()
if !c.leader.Load() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the leader elector is not configured, it seems like c.leader will never be set to true, and this part will ignore all workflows and never enqueue them

logger.Debug(ctx, "Ignoring workflow [%v] as non-leader", key)
return
}

Check warning on line 167 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L164-L167

Added lines #L164 - L167 were not covered by tests
logger.Infof(ctx, "==> Enqueueing workflow [%v]", key)
c.workQueue.AddRateLimited(key.String())
}
Expand Down
Loading