Skip to content

Commit a7f3de5

Browse files
committed
Cleanup flyte propeller
Signed-off-by: Jason Parraga <[email protected]>
1 parent 68fc4e6 commit a7f3de5

File tree

25 files changed

+62
-150
lines changed

25 files changed

+62
-150
lines changed

flytepropeller/events/local_eventsink.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,5 @@ func NewFileSink(path string) (EventSink, error) {
6565

6666
w := bufio.NewWriter(f)
6767

68-
if err != nil {
69-
return nil, err
70-
}
7168
return &localSink{writer: &FileWriter{ioWriter: w}}, nil
7269
}

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (in *WorkflowStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID)
145145

146146
dataDir, err := in.ConstructNodeDataDir(ctx, id)
147147
if err != nil {
148-
logger.Errorf(ctx, "Failed to construct data dir for node [%v], exec id [%v]", id)
148+
logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id)
149149
return n
150150
}
151151

flytepropeller/pkg/controller/controller.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
446446
return nil, errors.Wrapf(err, "failed to create node handler factory")
447447
}
448448

449-
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
450-
launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
451-
catalogClient, recoveryClient, cfg.LiteralOffloadingConfig, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
449+
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, storage.DataReference(cfg.DefaultRawOutputPrefix), catalogClient, recoveryClient, cfg.LiteralOffloadingConfig, &cfg.EventConfig, cfg.ClusterID, nodeHandlerFactory, scope)
452450
if err != nil {
453451
return nil, errors.Wrapf(err, "Failed to create Controller.")
454452
}

flytepropeller/pkg/controller/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
206206
logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] Stale.", namespace, name)
207207
return nil
208208
}
209-
logger.Warningf(ctx, "Failed to GetWorkflow, retrying with back-off", fetchErr)
209+
logger.Warningf(ctx, "Failed to GetWorkflow, retrying with back-off: %v", fetchErr)
210210
return fetchErr
211211
}
212212

flytepropeller/pkg/controller/nodes/array/handler_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@ import (
2323
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes"
2424
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog"
2525
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/errors"
26-
gatemocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/gate/mocks"
2726
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler"
2827
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
2928
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces/mocks"
3029
recoverymocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/recovery/mocks"
31-
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
3230
"github.com/flyteorg/flyte/flytestdlib/bitarray"
3331
"github.com/flyteorg/flyte/flytestdlib/contextutils"
3432
"github.com/flyteorg/flyte/flytestdlib/promutils"
@@ -52,22 +50,18 @@ var (
5250

5351
func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler interfaces.NodeHandler, dataStore *storage.DataStore, scope promutils.Scope) (interfaces.NodeHandler, error) {
5452
// mock components
55-
adminClient := launchplan.NewFailFastLaunchPlanExecutor()
5653
enqueueWorkflowFunc := func(workflowID v1alpha1.WorkflowID) {}
5754
eventConfig := &config.EventConfig{ErrorOnAlreadyExists: true}
5855
offloadingConfig := config.LiteralOffloadingConfig{Enabled: false}
5956
literalOffloadingConfig := config.LiteralOffloadingConfig{Enabled: true, MinSizeInMBForOffloading: 1024, MaxSizeInMBForOffloading: 1024 * 1024}
6057
mockEventSink := eventmocks.NewMockEventSink()
6158
mockHandlerFactory := &mocks.HandlerFactory{}
6259
mockHandlerFactory.EXPECT().GetHandler(mock.Anything).Return(nodeHandler, nil)
63-
mockKubeClient := execmocks.NewFakeKubeClient()
6460
mockRecoveryClient := &recoverymocks.Client{}
65-
mockSignalClient := &gatemocks.SignalServiceClient{}
6661
noopCatalogClient := catalog.NOOPCatalog{}
6762

6863
// create node executor
69-
nodeExecutor, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, dataStore, enqueueWorkflowFunc, mockEventSink, adminClient,
70-
adminClient, "s3://bucket/", mockKubeClient, noopCatalogClient, mockRecoveryClient, offloadingConfig, eventConfig, "clusterID", mockSignalClient, mockHandlerFactory, scope)
64+
nodeExecutor, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, dataStore, enqueueWorkflowFunc, mockEventSink, "s3://bucket/", noopCatalogClient, mockRecoveryClient, offloadingConfig, eventConfig, "clusterID", mockHandlerFactory, scope)
7165
assert.NoError(t, err)
7266

7367
// return ArrayNodeHandler

flytepropeller/pkg/controller/nodes/attr_path_resolver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// resolveAttrPathInPromise resolves the literal with attribute path
1818
// If the promise is chained with attributes (e.g. promise.a["b"][0]), then we need to resolve the promise
1919
func resolveAttrPathInPromise(ctx context.Context, datastore *storage.DataStore, nodeID string, literal *core.Literal, bindAttrPath []*core.PromiseAttribute) (*core.Literal, error) {
20-
var currVal *core.Literal = literal
20+
var currVal = literal
2121
var tmpVal *core.Literal
2222
var err error
2323
var exist bool

flytepropeller/pkg/controller/nodes/branch/evaluator.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
const ErrorCodeUserProvidedError = "UserProvidedError"
1616
const ErrorCodeMalformedBranch = "MalformedBranchUserError"
1717
const ErrorCodeCompilerError = "CompilerError"
18-
const ErrorCodeFailedFetchOutputs = "FailedFetchOutputs"
1918

2019
func EvaluateComparison(expr *core.ComparisonExpression, nodeInputs *core.LiteralMap) (bool, error) {
2120
var lValue *core.Literal
@@ -94,7 +93,7 @@ func EvaluateIfBlock(block v1alpha1.ExecutableIfBlock, nodeInputs *core.LiteralM
9493
// Decides the branch to be taken, returns the nodeId of the selected node or an error
9594
// The branchNode is marked as success. This is used by downstream node to determine if it can be executed
9695
// All downstream nodes are marked as skipped
97-
func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1.NodeID, node v1alpha1.ExecutableBranchNode, nodeInputs *core.LiteralMap) (*v1alpha1.NodeID, error) {
96+
func DecideBranch(ctx context.Context, nl executors.NodeLookup, node v1alpha1.ExecutableBranchNode, nodeInputs *core.LiteralMap) (*v1alpha1.NodeID, error) {
9897
var selectedNodeID *v1alpha1.NodeID
9998
var skippedNodeIds []*v1alpha1.NodeID
10099
var err error

flytepropeller/pkg/controller/nodes/branch/evaluator_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func TestDecideBranch(t *testing.T) {
355355
DataReferenceConstructor: dataStore,
356356
}
357357
branchNode := &v1alpha1.BranchNodeSpec{}
358-
b, err := DecideBranch(ctx, w, "n1", branchNode, nil)
358+
b, err := DecideBranch(ctx, w, branchNode, nil)
359359
assert.Error(t, err)
360360
assert.Nil(t, b)
361361
})
@@ -381,7 +381,7 @@ func TestDecideBranch(t *testing.T) {
381381
ThenNode: nil,
382382
},
383383
}
384-
b, err := DecideBranch(ctx, w, "n1", branchNode, inputs)
384+
b, err := DecideBranch(ctx, w, branchNode, inputs)
385385
assert.Error(t, err)
386386
assert.Nil(t, b)
387387
e, ok := errors.GetErrorCode(err)
@@ -416,7 +416,7 @@ func TestDecideBranch(t *testing.T) {
416416
ThenNode: &n1,
417417
},
418418
}
419-
b, err := DecideBranch(ctx, w, "n1", branchNode, inputs)
419+
b, err := DecideBranch(ctx, w, branchNode, inputs)
420420
assert.NoError(t, err)
421421
assert.NotNil(t, b)
422422
assert.Equal(t, n1, *b)
@@ -466,7 +466,7 @@ func TestDecideBranch(t *testing.T) {
466466
},
467467
},
468468
}
469-
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
469+
b, err := DecideBranch(ctx, w, branchNode, inputs)
470470
assert.NoError(t, err)
471471
assert.NotNil(t, b)
472472
assert.Equal(t, n1, *b)
@@ -517,7 +517,7 @@ func TestDecideBranch(t *testing.T) {
517517
},
518518
},
519519
}
520-
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
520+
b, err := DecideBranch(ctx, w, branchNode, inputs)
521521
assert.NoError(t, err)
522522
assert.NotNil(t, b)
523523
assert.Equal(t, n2, *b)
@@ -570,7 +570,7 @@ func TestDecideBranch(t *testing.T) {
570570
},
571571
Else: &n3,
572572
}
573-
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
573+
b, err := DecideBranch(ctx, w, branchNode, inputs)
574574
assert.NoError(t, err)
575575
assert.NotNil(t, b)
576576
assert.Equal(t, n3, *b)
@@ -620,7 +620,7 @@ func TestDecideBranch(t *testing.T) {
620620
},
621621
Else: &n3,
622622
}
623-
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
623+
b, err := DecideBranch(ctx, w, branchNode, inputs)
624624
assert.Error(t, err)
625625
assert.Nil(t, b)
626626
ec, ok := errors.GetErrorCode(err)
@@ -677,7 +677,7 @@ func TestDecideBranch(t *testing.T) {
677677
},
678678
}
679679

680-
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
680+
b, err := DecideBranch(ctx, w, branchNode, inputs)
681681
assert.Error(t, err)
682682
assert.Nil(t, b)
683683
ec, ok := errors.GetErrorCode(err)

flytepropeller/pkg/controller/nodes/branch/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha
4545
errMsg := fmt.Sprintf("Failed to read input. Error [%s]", err)
4646
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, errMsg, nil)), nil
4747
}
48-
finalNodeID, err := DecideBranch(ctx, nl, nCtx.NodeID(), branchNode, nodeInputs)
48+
finalNodeID, err := DecideBranch(ctx, nl, branchNode, nodeInputs)
4949
if err != nil {
5050
ec, ok := stdErrors.GetErrorCode(err)
5151
if ok {

flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry
151151
func (m *CatalogClient) createDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) {
152152
datasetID, err := GenerateDatasetIDForTask(ctx, key)
153153
if err != nil {
154-
logger.Errorf(ctx, "DataCatalog failed to generate dataset for ID: %s, err: %s", key.Identifier, err)
154+
logger.Errorf(ctx, "DataCatalog failed to generate dataset for ID: %v, err: %s", key.Identifier, err)
155155
return nil, err
156156
}
157157

@@ -164,7 +164,7 @@ func (m *CatalogClient) createDataset(ctx context.Context, key catalog.Key, meta
164164
if err != nil {
165165
logger.Debugf(ctx, "Create dataset %v return err %v", datasetID, err)
166166
if status.Code(err) == codes.AlreadyExists {
167-
logger.Debugf(ctx, "Create Dataset for ID %s already exists", key.Identifier)
167+
logger.Debugf(ctx, "Create Dataset for ID %v already exists", key.Identifier)
168168
} else {
169169
logger.Errorf(ctx, "Unable to create dataset %s, err: %s", datasetID, err)
170170
return nil, err

0 commit comments

Comments
 (0)