diff --git a/events/admin_eventsink_integration_test.go b/events/admin_eventsink_integration_test.go index 44c23a344..4875e14f1 100644 --- a/events/admin_eventsink_integration_test.go +++ b/events/admin_eventsink_integration_test.go @@ -1,4 +1,6 @@ +//go:build integration // +build integration + // Add this tag to your project settings if you want to pick it up. package events @@ -30,11 +32,17 @@ var ( ) // To run this test, and see if the deadline working, pick an existing successful execution from your admin database -// select * from executions; +// +// select * from executions; +// // Then delete all the events from it. -// delete from execution_events where execution_name = 'ikuy55mn0y'; +// +// delete from execution_events where execution_name = 'ikuy55mn0y'; +// // Then run this -// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work; +// +// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work; +// // This will lock your table so that admin can't read it, causing the grpc call to timeout. // On timeout, you should get a deadline exceeded error. Otherwise, you should get an error to the effect of // "Invalid phase change from SUCCEEDED to RUNNING" or something like that. diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index d7300818b..916c1e5b3 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -83,10 +83,10 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { +func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { p.ttype = handler.TransitionTypeEphemeral p.pInfo = pluginCore.PhaseInfoSuccess(nil) - p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) + p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) } func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { @@ -156,10 +156,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp return ToTaskExecutionEvent(input) } -func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { - p.execInfo.OutputInfo = &handler.OutputInfo{ - OutputURI: outputPath, - DeckURI: deckPath, +func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) { + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + OutputURI: outputPath, + } + } else { + p.execInfo.OutputInfo.OutputURI = outputPath } p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ @@ -183,7 +186,43 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle } logger.Debugf(ctx, "Task still running") - return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil + return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil +} + +// AddDeckURI incorporates the deck URI into the plugin execution info regardless of whether the URI exists in remote storage or not. +func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) { + var deckURI *storage.DataReference + deckURIValue := tCtx.ow.GetDeckPath() + deckURI = &deckURIValue + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + DeckURI: deckURI, + } + } else { + p.execInfo.OutputInfo.DeckURI = deckURI + } + +} + +// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage. +func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error { + reader := tCtx.ow.GetReader() + if reader == nil && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + return nil + } + + exists, err := reader.DeckExists(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + return regErrors.Wrapf(err, "failed to check existence of deck file") + } + + if !exists && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + } + + return nil } // The plugin interface available especially for testing. @@ -462,8 +501,19 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } } + // Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality. + // The deck should be accessible even if the task is still running or has failed. + // It's possible that the deck URI may not exist in remote storage yet or will never be exist. + // So, it is console's responsibility to handle the case when the deck URI actually does not exist. + pluginTrns.AddDeckURI(ctx, tCtx) + switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } // ------------------------------------- // TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes // This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute @@ -502,18 +552,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } else { - var deckURI *storage.DataReference - if tCtx.ow.GetReader() != nil { - exists, err := tCtx.ow.GetReader().DeckExists(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) - return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") - } else if exists { - deckURIValue := tCtx.ow.GetDeckPath() - deckURI = &deckURIValue - } - } - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, + pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), &event.TaskNodeMetadata{ CacheStatus: cacheStatus.GetCacheStatus(), CatalogKey: cacheStatus.GetMetadata(), @@ -523,6 +562,11 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseFailure). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } pluginTrns.ObservedFailure( &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), @@ -617,7 +661,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) return handler.UnknownTransition, err } - pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry) + pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), entry) } else { logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String()) pluginTrns.PopulateCacheInfo(entry)