Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions events/admin_eventsink_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//go:build integration
// +build integration

// Add this tag to your project settings if you want to pick it up.

package events
Expand Down Expand Up @@ -30,11 +32,17 @@ var (
)

// To run this test, and see if the deadline working, pick an existing successful execution from your admin database
// select * from executions;
//
// select * from executions;
//
// Then delete all the events from it.
// delete from execution_events where execution_name = 'ikuy55mn0y';
//
// delete from execution_events where execution_name = 'ikuy55mn0y';
//
// Then run this
// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work;
//
// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work;
//
// This will lock your table so that admin can't read it, causing the grpc call to timeout.
// On timeout, you should get a deadline exceeded error. Otherwise, you should get an error to the effect of
// "Invalid phase change from SUCCEEDED to RUNNING" or something like that.
Expand Down
84 changes: 64 additions & 20 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func getPluginMetricKey(pluginID, taskType string) string {
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -156,10 +156,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -183,7 +186,43 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// AddDeckURI incorporates the deck URI into the plugin execution info regardless of whether the URI exists in remote storage or not.
func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
DeckURI: deckURI,
}
} else {
p.execInfo.OutputInfo.DeckURI = deckURI
}

}

// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
return nil
}

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
}

return nil
}

// The plugin interface available especially for testing.
Expand Down Expand Up @@ -462,8 +501,19 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never be exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(ctx, tCtx)

switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand Down Expand Up @@ -502,18 +552,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
&event.TaskNodeMetadata{
CacheStatus: cacheStatus.GetCacheStatus(),
CatalogKey: cacheStatus.GetMetadata(),
Expand All @@ -523,6 +562,11 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseFailure).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
Expand Down Expand Up @@ -617,7 +661,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
return handler.UnknownTransition, err
}

pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry)
pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), entry)
} else {
logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String())
pluginTrns.PopulateCacheInfo(entry)
Expand Down