Skip to content

Commit

Permalink
Add custominfo to agents (#5604)
Browse files Browse the repository at this point in the history
Signed-off-by: ddl-rliu <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
ddl-rliu authored and pmahindrakar-oss committed Sep 9, 2024
1 parent 1efba86 commit d8b7eb7
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
32 changes: 18 additions & 14 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -39,10 +40,11 @@ type Plugin struct {
type ResourceWrapper struct {
Phase flyteIdl.TaskExecution_Phase
// Deprecated: Please Use Phase instead.
State admin.State
Outputs *flyteIdl.LiteralMap
Message string
LogLinks []*flyteIdl.TaskLog
State admin.State
Outputs *flyteIdl.LiteralMap
Message string
LogLinks []*flyteIdl.TaskLog
CustomInfo *structpb.Struct
}

// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state.
Expand Down Expand Up @@ -192,10 +194,11 @@ func (p *Plugin) ExecuteTaskSync(
}

return nil, ResourceWrapper{
Phase: resource.Phase,
Outputs: resource.Outputs,
Message: resource.Message,
LogLinks: resource.LogLinks,
Phase: resource.Phase,
Outputs: resource.Outputs,
Message: resource.Message,
LogLinks: resource.LogLinks,
CustomInfo: resource.CustomInfo,
}, err
}

Expand All @@ -221,11 +224,12 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web
}

return ResourceWrapper{
Phase: res.Resource.Phase,
State: res.Resource.State,
Outputs: res.Resource.Outputs,
Message: res.Resource.Message,
LogLinks: res.Resource.LogLinks,
Phase: res.Resource.Phase,
State: res.Resource.State,
Outputs: res.Resource.Outputs,
Message: res.Resource.Message,
LogLinks: res.Resource.LogLinks,
CustomInfo: res.Resource.CustomInfo,
}, nil
}

Expand Down Expand Up @@ -254,7 +258,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error

func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
resource := taskCtx.Resource().(ResourceWrapper)
taskInfo := &core.TaskInfo{Logs: resource.LogLinks}
taskInfo := &core.TaskInfo{Logs: resource.LogLinks, CustomInfo: resource.CustomInfo}

switch resource.Phase {
case flyteIdl.TaskExecution_QUEUED:
Expand Down
16 changes: 12 additions & 4 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"

agentMocks "github.com/flyteorg/flyte/flyteidl/clients/go/admin/mocks"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -114,17 +115,24 @@ func TestPlugin(t *testing.T) {
})

t.Run("test RUNNING Status", func(t *testing.T) {
simpleStruct := structpb.Struct{
Fields: map[string]*structpb.Value{
"foo": {Kind: &structpb.Value_StringValue{StringValue: "foo"}},
},
}
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(ResourceWrapper{
State: admin.State_RUNNING,
Outputs: nil,
Message: "Job is running",
LogLinks: []*flyteIdlCore.TaskLog{{Uri: "http://localhost:3000/log", Name: "Log Link"}},
State: admin.State_RUNNING,
Outputs: nil,
Message: "Job is running",
LogLinks: []*flyteIdlCore.TaskLog{{Uri: "http://localhost:3000/log", Name: "Log Link"}},
CustomInfo: &simpleStruct,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase())
assert.Equal(t, &simpleStruct, phase.Info().CustomInfo)
})

t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) {
Expand Down

0 comments on commit d8b7eb7

Please sign in to comment.