diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 9713ba90f3..2ceba6f43e 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -112,16 +112,16 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR taskTemplate.GetContainer().Args = argTemplate } - return &ResourceMetaWrapper{ + return ResourceMetaWrapper{ OutputPrefix: outputPrefix, AgentResourceMeta: res.GetResourceMeta(), Token: "", TaskType: taskTemplate.Type, - }, &ResourceWrapper{State: admin.State_RUNNING}, nil + }, ResourceWrapper{State: admin.State_RUNNING}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - metadata := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper) agent, err := getFinalAgent(metadata.TaskType, p.cfg) if err != nil { @@ -140,7 +140,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } - return &ResourceWrapper{ + return ResourceWrapper{ State: res.Resource.State, Outputs: res.Resource.Outputs, }, nil @@ -169,7 +169,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - resource := taskCtx.Resource().(*ResourceWrapper) + resource := taskCtx.Resource().(ResourceWrapper) taskInfo := &core.TaskInfo{} switch resource.State { @@ -190,7 +190,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase return core.PhaseInfoUndefined, pluginErrors.Errorf(core.SystemErrorCode, "unknown execution phase [%v].", resource.State) } -func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource *ResourceWrapper) error { +func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource ResourceWrapper) error { taskTemplate, err := taskCtx.TaskReader().Read(ctx) if err != nil { return err diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 7626cb9c6b..f911c8bfb6 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -146,12 +146,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR } runID := fmt.Sprintf("%v", data["run_id"]) - return &ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, - &ResourceWrapper{StatusCode: resp.StatusCode}, nil + return ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, + ResourceWrapper{StatusCode: resp.StatusCode}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(get, nil, p.cfg.databricksEndpoint, p.cfg.DatabricksInstance, exec.Token, exec.RunID, false) if err != nil { @@ -176,7 +176,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba jobID := fmt.Sprintf("%.0f", data["job_id"]) lifeCycleState := fmt.Sprintf("%s", jobState["life_cycle_state"]) resultState := fmt.Sprintf("%s", jobState["result_state"]) - return &ResourceWrapper{ + return ResourceWrapper{ StatusCode: resp.StatusCode, JobID: jobID, LifeCycleState: lifeCycleState, @@ -206,8 +206,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) - resource := taskCtx.Resource().(*ResourceWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) + resource := taskCtx.Resource().(ResourceWrapper) message := resource.Message statusCode := resource.StatusCode jobID := resource.JobID diff --git a/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go b/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go index 88bb8f50f5..3c7871880a 100644 --- a/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go @@ -139,12 +139,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR queryID := fmt.Sprintf("%v", data["statementHandle"]) message := fmt.Sprintf("%v", data["message"]) - return &ResourceMetaWrapper{queryID, queryInfo.Account, token}, - &ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil + return ResourceMetaWrapper{queryID, queryInfo.Account, token}, + ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(get, QueryInfo{}, p.cfg.snowflakeEndpoint, exec.Account, exec.Token, exec.QueryID, false) if err != nil { @@ -160,7 +160,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } message := fmt.Sprintf("%v", data["message"]) - return &ResourceWrapper{ + return ResourceWrapper{ StatusCode: resp.StatusCode, Message: message, }, nil @@ -170,7 +170,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error if taskCtx.ResourceMeta() == nil { return nil } - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(post, QueryInfo{}, p.cfg.snowflakeEndpoint, exec.Account, exec.Token, exec.QueryID, true) if err != nil { @@ -187,8 +187,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(_ context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) - statusCode := taskCtx.Resource().(*ResourceWrapper).StatusCode + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) + statusCode := taskCtx.Resource().(ResourceWrapper).StatusCode if statusCode == 0 { return core.PhaseInfoUndefined, errors.Errorf(ErrSystem, "No Status field set.") } @@ -276,7 +276,7 @@ func newSnowflakeJobTaskPlugin() webapi.PluginEntry { ID: "snowflake", SupportedTaskTypes: []core.TaskType{"snowflake"}, PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { - return &Plugin{ + return Plugin{ metricScope: iCtx.MetricsScope(), cfg: GetConfig(), client: &http.Client{}, diff --git a/rsts/deployment/index.rst b/rsts/deployment/index.rst index 8f569eb843..820e941bc4 100644 --- a/rsts/deployment/index.rst +++ b/rsts/deployment/index.rst @@ -36,7 +36,7 @@ plugins, authentication, performance tuning, and maintaining Flyte as a producti :text: 🤖 Agent Setup :classes: btn-block stretched-link ^^^^^^^^^^^^ - Enable Flyte agents to extend Flyte's capabilities, including features like File sesnor, Databricks job, and Snowflake query services. + Enable Flyte agents to extend Flyte's capabilities, including features like File sensor, Databricks job, and Snowflake query services. ---