From 6e22b44c8616260948ea873178d8350a77bdf7f7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 11 Oct 2022 14:49:49 -0700 Subject: [PATCH 1/6] Cache flyte deck Signed-off-by: Kevin Su --- go.mod | 2 ++ go.sum | 4 ++-- .../nodes/task/catalog/datacatalog/datacatalog.go | 12 ++++++++---- .../nodes/task/catalog/datacatalog/transformer.go | 1 + 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 19e4fd4ee..d11ddac84 100644 --- a/go.mod +++ b/go.mod @@ -146,3 +146,5 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d + +replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.16-0.20221011214704-ae6210e6c847 diff --git a/go.sum b/go.sum index 963053582..bc861386d 100644 --- a/go.sum +++ b/go.sum @@ -294,8 +294,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.1.19 h1:1CtSbuFhFHwUbKdv66PqbcER01iacAJU+snh0eTsXc4= github.com/flyteorg/flyteidl v1.1.19/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U= -github.com/flyteorg/flyteplugins v1.0.15 h1:LewZIw2qSyGy34OoghYeuc7N/KazeVZvD0gNYXt/ZcM= -github.com/flyteorg/flyteplugins v1.0.15/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= +github.com/flyteorg/flyteplugins v1.0.16-0.20221011214704-ae6210e6c847 h1:qHqjEhvifpLlMaFAaSS6gm9UquJHTQQrH1+tzlsXaWY= +github.com/flyteorg/flyteplugins v1.0.16-0.20221011214704-ae6210e6c847/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk= github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0= diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index b776d8c55..5bc3199af 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -4,6 +4,8 @@ import ( "context" "crypto/x509" "fmt" + "github.com/flyteorg/flytestdlib/storage" + "golang.org/x/exp/maps" "time" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -135,13 +137,14 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry md := EventCatalogMetadata(dataset.GetId(), relevantTag, source) outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact) + deckURI := storage.DataReference(artifact.GetMetadata().KeyMap[deckURIKey]) if err != nil { logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, &deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err } logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, &deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil } func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) { @@ -240,11 +243,12 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp } // Create the artifact for the execution that belongs in the task - cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier)) + artifactMetadata := GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier) + maps.Copy(artifactMetadata.KeyMap, reader.GetOutputMetadata(ctx)) + cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, artifactMetadata) if err != nil { return catalog.Status{}, errors.Wrapf(err, "failed to create dataset for ID %s", key.Identifier.String()) } - // Tag the artifact since it is the cached artifact tagName, err := GenerateArtifactTagName(ctx, inputs) if err != nil { diff --git a/pkg/controller/nodes/task/catalog/datacatalog/transformer.go b/pkg/controller/nodes/task/catalog/datacatalog/transformer.go index abf36f128..de02ec2fc 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/transformer.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/transformer.go @@ -219,6 +219,7 @@ const ( execProjectKey = "exec-project" execNodeIDKey = "exec-node" execTaskAttemptKey = "exec-attempt" + deckURIKey = "deck-uri" ) // Understanding Catalog Identifiers From 264614d94836934319982339dff32d1548ce0d18 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 11 Oct 2022 17:00:37 -0700 Subject: [PATCH 2/6] Cache flyte deck Signed-off-by: Kevin Su --- go.mod | 2 +- go.sum | 4 ++-- pkg/controller/nodes/branch/handler.go | 5 +++-- pkg/controller/nodes/dynamic/dynamic_workflow.go | 4 ++-- pkg/controller/nodes/subworkflow/launchplan.go | 4 ++-- pkg/controller/nodes/subworkflow/subworkflow.go | 3 ++- pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go | 2 +- pkg/controller/nodes/task/catalog/datacatalog/transformer.go | 2 +- pkg/controller/nodes/task/handler.go | 5 +++-- 9 files changed, 17 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index d11ddac84..0ebcf138c 100644 --- a/go.mod +++ b/go.mod @@ -147,4 +147,4 @@ require ( replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d -replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.16-0.20221011214704-ae6210e6c847 +replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.16-0.20221011220618-4654389800fe diff --git a/go.sum b/go.sum index bc861386d..c577b44a7 100644 --- a/go.sum +++ b/go.sum @@ -294,8 +294,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.1.19 h1:1CtSbuFhFHwUbKdv66PqbcER01iacAJU+snh0eTsXc4= github.com/flyteorg/flyteidl v1.1.19/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U= -github.com/flyteorg/flyteplugins v1.0.16-0.20221011214704-ae6210e6c847 h1:qHqjEhvifpLlMaFAaSS6gm9UquJHTQQrH1+tzlsXaWY= -github.com/flyteorg/flyteplugins v1.0.16-0.20221011214704-ae6210e6c847/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= +github.com/flyteorg/flyteplugins v1.0.16-0.20221011220618-4654389800fe h1:SKV7Nn9aUHCVEVPP8/S+Qcl1t83bzzwz/6deAYIldPc= +github.com/flyteorg/flyteplugins v1.0.16-0.20221011220618-4654389800fe/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk= github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0= diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index 9b0cd7f59..f4ae4bbec 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -3,7 +3,6 @@ package branch import ( "context" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" @@ -148,8 +147,10 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node if downstreamStatus.IsComplete() { // For branch node we set the output node to be the same as the child nodes output + deckURI := v1alpha1.GetDeckFile(childNodeStatus.GetOutputDir()) phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{ - OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())}, + OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir()), + DeckURI: &deckURI}, }) return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 2c7803b5a..7cdaed350 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -313,8 +313,8 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Failed to copy subworkflow outputs"}, nil } - - o = &handler.OutputInfo{OutputURI: destinationPath} + deckURI := v1alpha1.GetDeckFile(endNodeStatus.GetOutputDir()) + o = &handler.OutputInfo{OutputURI: destinationPath, DeckURI: &deckURI} } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{ diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index ec972d9fd..ad898ce28 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -213,8 +213,8 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow") } } - - oInfo = &handler.OutputInfo{OutputURI: outputFile} + deckURI := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir()) + oInfo = &handler.OutputInfo{OutputURI: outputFile, DeckURI: &deckURI} } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{ diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index 24d74473f..e5d677bdf 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -118,7 +118,8 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler errMsg := fmt.Sprintf("Failed to copy subworkflow outputs from [%v] to [%v]", sourcePath, destinationPath) return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, errMsg, nil)), nil } - oInfo = &handler.OutputInfo{OutputURI: destinationPath} + DeckURI := v1alpha1.GetDeckFile(endNodeStatus.GetOutputDir()) + oInfo = &handler.OutputInfo{OutputURI: destinationPath, DeckURI: &DeckURI} } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{ diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index 5bc3199af..266e78a95 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -137,7 +137,7 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry md := EventCatalogMetadata(dataset.GetId(), relevantTag, source) outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact) - deckURI := storage.DataReference(artifact.GetMetadata().KeyMap[deckURIKey]) + deckURI := storage.DataReference(artifact.GetMetadata().KeyMap[DeckURIKey]) if err != nil { logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err) return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, &deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err diff --git a/pkg/controller/nodes/task/catalog/datacatalog/transformer.go b/pkg/controller/nodes/task/catalog/datacatalog/transformer.go index de02ec2fc..34e0519e0 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/transformer.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/transformer.go @@ -219,7 +219,7 @@ const ( execProjectKey = "exec-project" execNodeIDKey = "exec-node" execTaskAttemptKey = "exec-attempt" - deckURIKey = "deck-uri" + DeckURIKey = "deck-uri" ) // Understanding Catalog Identifiers diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index a5685c6f7..99608fc9a 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -37,6 +37,7 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog/datacatalog" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/secretmanager" ) @@ -591,8 +592,8 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error()) return handler.UnknownTransition, err } - - pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry) + deckPath := storage.DataReference(tCtx.ow.GetReader().GetOutputMetadata(ctx)[datacatalog.DeckURIKey]) + pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), &deckPath, entry) } else { logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String()) pluginTrns.PopulateCacheInfo(entry) From e50763ac53abb3c78e3c4902c5f5526b141feca0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 12 Oct 2022 07:32:40 -0700 Subject: [PATCH 3/6] wip Signed-off-by: Kevin Su --- pkg/controller/nodes/branch/handler.go | 4 +--- pkg/controller/nodes/dynamic/dynamic_workflow.go | 3 +-- pkg/controller/nodes/subworkflow/subworkflow.go | 3 +-- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index f4ae4bbec..8d8aafa89 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -147,10 +147,8 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node if downstreamStatus.IsComplete() { // For branch node we set the output node to be the same as the child nodes output - deckURI := v1alpha1.GetDeckFile(childNodeStatus.GetOutputDir()) phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{ - OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir()), - DeckURI: &deckURI}, + OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())}, }) return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 7cdaed350..9ddf9a3e8 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -313,8 +313,7 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Failed to copy subworkflow outputs"}, nil } - deckURI := v1alpha1.GetDeckFile(endNodeStatus.GetOutputDir()) - o = &handler.OutputInfo{OutputURI: destinationPath, DeckURI: &deckURI} + o = &handler.OutputInfo{OutputURI: destinationPath} } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{ diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index e5d677bdf..24d74473f 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -118,8 +118,7 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler errMsg := fmt.Sprintf("Failed to copy subworkflow outputs from [%v] to [%v]", sourcePath, destinationPath) return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, errMsg, nil)), nil } - DeckURI := v1alpha1.GetDeckFile(endNodeStatus.GetOutputDir()) - oInfo = &handler.OutputInfo{OutputURI: destinationPath, DeckURI: &DeckURI} + oInfo = &handler.OutputInfo{OutputURI: destinationPath} } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{ From c1c4966ad7b4fa5d30b1afd4214d9aafb6538b89 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 12 Oct 2022 07:35:02 -0700 Subject: [PATCH 4/6] lint Signed-off-by: Kevin Su --- pkg/controller/nodes/branch/handler.go | 1 + pkg/controller/nodes/dynamic/dynamic_workflow.go | 1 + pkg/controller/nodes/subworkflow/launchplan.go | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index 8d8aafa89..9b0cd7f59 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -3,6 +3,7 @@ package branch import ( "context" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 9ddf9a3e8..2c7803b5a 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -313,6 +313,7 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Failed to copy subworkflow outputs"}, nil } + o = &handler.OutputInfo{OutputURI: destinationPath} } diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index ad898ce28..ec972d9fd 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -213,8 +213,8 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow") } } - deckURI := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir()) - oInfo = &handler.OutputInfo{OutputURI: outputFile, DeckURI: &deckURI} + + oInfo = &handler.OutputInfo{OutputURI: outputFile} } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{ From 1a6093142ddf80d83a0cd814fcb39def03b49495 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 12 Oct 2022 11:50:08 -0700 Subject: [PATCH 5/6] Add tests Signed-off-by: Kevin Su --- pkg/controller/nodes/task/handler.go | 9 +++++++-- pkg/controller/nodes/task/handler_test.go | 10 ++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 99608fc9a..068a06de8 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -592,8 +592,13 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error()) return handler.UnknownTransition, err } - deckPath := storage.DataReference(tCtx.ow.GetReader().GetOutputMetadata(ctx)[datacatalog.DeckURIKey]) - pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), &deckPath, entry) + deckPathValue, ok := tCtx.ow.GetReader().GetOutputMetadata(ctx)[datacatalog.DeckURIKey] + if ok { + deckPath := storage.DataReference(deckPathValue) + pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), &deckPath, entry) + } else { + pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry) + } } else { logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String()) pluginTrns.PopulateCacheInfo(entry) diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index bc2487a6a..5d1544d6c 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -48,6 +48,7 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" nodeMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler/mocks" + datacatalogClient "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog/datacatalog" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/codex" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/fakeplugins" @@ -59,6 +60,7 @@ var eventConfig = &controllerConfig.EventConfig{ } const testClusterID = "C1" +const deckPath = "deck.html" func Test_task_setDefault(t *testing.T) { type fields struct { @@ -908,6 +910,7 @@ func Test_task_Handle_Catalog(t *testing.T) { if tt.args.catalogFetch { or := &ioMocks.OutputReader{} or.OnDeckExistsMatch(mock.Anything).Return(true, nil) + or.OnGetOutputMetadataMatch(mock.Anything).Return(map[string]string{datacatalogClient.DeckURIKey: deckPath}) or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil) c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil) } else { @@ -935,6 +938,9 @@ func Test_task_Handle_Catalog(t *testing.T) { } if err == nil { assert.Equal(t, tt.want.handlerPhase.String(), got.Info().GetPhase().String()) + if tt.name == "cache-hit" { + assert.Equal(t, deckPath, got.Info().GetInfo().OutputInfo.DeckURI.String()) + } if assert.Equal(t, 1, len(ev.evs)) { e := ev.evs[0] assert.Equal(t, tt.want.eventPhase.String(), e.Phase.String()) @@ -1136,6 +1142,7 @@ func Test_task_Handle_Reservation(t *testing.T) { if tt.args.catalogFetch { or := &ioMocks.OutputReader{} or.OnDeckExistsMatch(mock.Anything).Return(true, nil) + or.OnGetOutputMetadataMatch(mock.Anything).Return(map[string]string{datacatalogClient.DeckURIKey: deckPath}) or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil) c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil) } else { @@ -1157,6 +1164,9 @@ func Test_task_Handle_Reservation(t *testing.T) { } if err == nil { assert.Equal(t, tt.want.handlerPhase.String(), got.Info().GetPhase().String()) + if tt.name == "cache-hit" { + assert.Equal(t, deckPath, got.Info().GetInfo().OutputInfo.DeckURI.String()) + } if assert.Equal(t, 1, len(ev.evs)) { e := ev.evs[0] assert.Equal(t, tt.want.eventPhase.String(), e.Phase.String()) From c158ab165b8de40d1bf2ac290bac39b98fc8abc1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 12 Oct 2022 17:03:55 -0700 Subject: [PATCH 6/6] Fix test error Signed-off-by: Kevin Su --- .../task/catalog/datacatalog/datacatalog.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index 266e78a95..f74a0f151 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -4,9 +4,10 @@ import ( "context" "crypto/x509" "fmt" + "time" + "github.com/flyteorg/flytestdlib/storage" "golang.org/x/exp/maps" - "time" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" @@ -137,14 +138,22 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry md := EventCatalogMetadata(dataset.GetId(), relevantTag, source) outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact) - deckURI := storage.DataReference(artifact.GetMetadata().KeyMap[DeckURIKey]) + var deckURI *storage.DataReference + if artifact.GetMetadata() != nil { + deckURIValue, ok := artifact.GetMetadata().KeyMap[DeckURIKey] + if ok { + reference := storage.DataReference(deckURIValue) + deckURI = &reference + } + } + if err != nil { logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, &deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err } logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, &deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, deckURI, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil } func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) {