Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytepropeller][flyteadmin] Streaming Decks #5799

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.Event.DeckUri
return nil
}

Expand Down
84 changes: 65 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {

Check warning on line 74 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L74

Added line #L74 was not covered by tests
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})

Check warning on line 77 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L77

Added line #L77 was not covered by tests
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -144,10 +144,13 @@
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}

Check warning on line 151 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L149-L151

Added lines #L149 - L151 were not covered by tests
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -171,9 +174,43 @@
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// AddDeckURI incorporates the deck URI into the plugin execution info regardless of whether the URI exists in remote storage or not.
func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

p.execInfo.OutputInfo.DeckURI = deckURI
}

//// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
//func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
// reader := tCtx.ow.GetReader()
// if reader == nil && p.execInfo.OutputInfo != nil {
// p.execInfo.OutputInfo.DeckURI = nil
// return nil
// }
//
// exists, err := reader.DeckExists(ctx)
// if err != nil {
// logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
// return regErrors.Wrapf(err, "failed to check existence of deck file")
// }
//
// if !exists && p.execInfo.OutputInfo != nil {
// p.execInfo.OutputInfo.DeckURI = nil
// }
//
// return nil
//}

// The plugin interface available especially for testing.
type PluginRegistryIface interface {
GetCorePlugins() []pluginCore.PluginEntry
Expand Down Expand Up @@ -464,8 +501,20 @@
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never be existed.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(ctx, tCtx)

switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
//// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
//err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
//if err != nil {
// return pluginTrns, err
//}

// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand All @@ -491,6 +540,7 @@
outputCommitter := ioutils.NewRemoteFileOutputWriter(ctx, tCtx.DataStore(), tCtx.OutputWriter())
ee, err := t.ValidateOutput(ctx, tCtx.NodeID(), tCtx.InputReader(), tCtx.ow.GetReader(),
outputCommitter, tCtx.ExecutionContext().GetExecutionConfig(), tCtx.tr)

if err != nil {
return nil, err
}
Expand All @@ -501,25 +551,21 @@
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
&event.TaskNodeMetadata{
//CacheStatus: pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.GetCacheStatus(),
//CatalogKey: pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.GetCatalogKey(),
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
//// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseFailure).
//err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
//if err != nil {
// return pluginTrns, err
//}

Check warning on line 568 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L564-L568

Added lines #L564 - L568 were not covered by tests
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
Expand Down
Loading