Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add Pending State and State Message in Webapi Agent #399

Closed
3 changes: 3 additions & 0 deletions go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/x509"
"encoding/gob"
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/config"
Expand Down Expand Up @@ -162,6 +163,8 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
taskInfo := &core.TaskInfo{}

switch resource.State {
case admin.State_PENDING:
return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "Job Created"), nil
Future-Outlier marked this conversation as resolved.
Show resolved Hide resolved
case admin.State_RUNNING:
return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil
case admin.State_PERMANENT_FAILURE:
Expand Down
105 changes: 104 additions & 1 deletion go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/config"

"google.golang.org/grpc"

pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"
webapiPlugin "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi/mocks"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -99,4 +100,106 @@ func TestPlugin(t *testing.T) {
ctx, _ = getFinalContext(context.TODO(), "CreateTask", &Agent{Endpoint: "localhost:8080", Timeouts: map[string]config.Duration{"CreateTask": {Duration: 1 * time.Millisecond}}})
assert.NotEqual(t, context.TODO(), ctx)
})

t.Run("test PENDING Status", func(t *testing.T) {
plugin := Plugin{
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}
taskContext := new(webapiPlugin.StatusContext)

pingsutw marked this conversation as resolved.
Show resolved Hide resolved
taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_PENDING,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, phase.Phase())
})

t.Run("test RUNNING Status", func(t *testing.T) {
plugin := Plugin{
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}
taskContext := new(webapiPlugin.StatusContext)

taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_RUNNING,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase())
})

t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) {
plugin := Plugin{
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}
taskContext := new(webapiPlugin.StatusContext)

taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_PERMANENT_FAILURE,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase())
})

t.Run("test RETRYABLE_FAILURE Status", func(t *testing.T) {
plugin := Plugin{
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}
taskContext := new(webapiPlugin.StatusContext)

taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_RETRYABLE_FAILURE,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phase.Phase())
})

t.Run("test SUCCEEDED Status", func(t *testing.T) {
plugin := Plugin{
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}
taskContext := new(webapiPlugin.StatusContext)

taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_SUCCEEDED,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseSuccess, phase.Phase())
})

t.Run("test UNDEFINED Status", func(t *testing.T) {
plugin := Plugin{
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}
taskContext := new(webapiPlugin.StatusContext)

taskContext.On("Resource").Return(&ResourceWrapper{
State: 5,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.Error(t, err)
assert.Equal(t, pluginsCore.PhaseUndefined, phase.Phase())
})
}
Loading