Skip to content

Commit

Permalink
Add flyteconsole url to FlyteWorkflow CRD (#5449)
Browse files Browse the repository at this point in the history
* Add `ConsoleURL` to `FlyteWorkflow` CRD

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add `ConsoleURL` to flyteadmin config and write it to CRD

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add ConsoleURL to application_config_provider and DeepCopyInto

Signed-off-by: Eduardo Apolinario <[email protected]>

* wip

Signed-off-by: Eduardo Apolinario <[email protected]>

* more wip

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix flyteplugins unit tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix existing propeller unit tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add a few unit tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Include FLYTE_EXECUTION_URL iff "link_type" is set in the task template

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove TODOs

Signed-off-by: Eduardo Apolinario <[email protected]>

* Only include consoleURL if task set the relevant bit in its task template

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix flyteplugins tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove attempt number from the url

Signed-off-by: Eduardo Apolinario <[email protected]>

* Include review feedback

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario and eapolinario authored Jun 14, 2024
1 parent bba8c11 commit 7d788cb
Show file tree
Hide file tree
Showing 47 changed files with 1,144 additions and 387 deletions.
1 change: 1 addition & 0 deletions flyteadmin/pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
MaxParallelism: 25,
K8SServiceAccount: "",
UseOffloadedWorkflowClosure: false,
ConsoleURL: "",
})

var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type ApplicationConfig struct {
Envs map[string]string `json:"envs,omitempty"`

FeatureGates FeatureGates `json:"featureGates" pflag:",Enable experimental features."`

// A URL pointing to the flyteconsole instance used to hit this flyteadmin instance.
ConsoleURL string `json:"consoleUrl,omitempty" pflag:",A URL pointing to the flyteconsole instance used to hit this flyteadmin instance."`
}

func (a *ApplicationConfig) GetRoleNameKey() string {
Expand Down
4 changes: 4 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
flyteWf.Tasks = nil
}

if consoleURL := e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL; len(consoleURL) > 0 {
flyteWf.ConsoleURL = consoleURL
}

executionTargetSpec := executioncluster.ExecutionTargetSpec{
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ type TaskExecutionMetadata interface {
GetPlatformResources() *v1.ResourceRequirements
GetInterruptibleFailureThreshold() int32
GetEnvironmentVariables() map[string]string
GetConsoleURL() string
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (e ErrorCollection) Error() string {

// Parameters struct is used by the Templating Engine to replace the templated parameters
type Parameters struct {
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
IncludeConsoleURL bool
}

// Render Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,12 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.
}
container.Args = modifiedArgs

container.Env, container.EnvFrom = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID())
// The flyteconsole url is added based on the `IncludeConsoleURL` bit set via the task template
consoleURL := ""
if parameters.IncludeConsoleURL {
consoleURL = parameters.TaskExecMetadata.GetConsoleURL()
}
container.Env, container.EnvFrom = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID(), consoleURL)

// retrieve platformResources and overrideResources to use when aggregating container resources
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func TestToK8sContainer(t *testing.T) {
"foo": "bar",
})
mockTaskExecMetadata.OnGetNamespace().Return("my-namespace")
mockTaskExecMetadata.OnGetConsoleURL().Return("")

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
Expand Down Expand Up @@ -447,9 +448,10 @@ func TestToK8sContainer(t *testing.T) {
assert.False(t, *container.SecurityContext.AllowPrivilegeEscalation)
}

func getTemplateParametersForTest(resourceRequirements, platformResources *v1.ResourceRequirements) template.Parameters {
func getTemplateParametersForTest(resourceRequirements, platformResources *v1.ResourceRequirements, includeConsoleURL bool, consoleURL string) template.Parameters {
mockTaskExecMetadata := mocks.TaskExecutionMetadata{}
mockTaskExecutionID := mocks.TaskExecutionID{}
mockTaskExecutionID.OnGetUniqueNodeID().Return("unique_node_id")
mockTaskExecutionID.OnGetGeneratedName().Return("gen_name")
mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
Expand Down Expand Up @@ -477,6 +479,7 @@ func getTemplateParametersForTest(resourceRequirements, platformResources *v1.Re
mockTaskExecMetadata.OnGetPlatformResources().Return(platformResources)
mockTaskExecMetadata.OnGetEnvironmentVariables().Return(nil)
mockTaskExecMetadata.OnGetNamespace().Return("my-namespace")
mockTaskExecMetadata.OnGetConsoleURL().Return(consoleURL)

mockInputReader := mocks2.InputReader{}
mockInputPath := storage.DataReference("s3://input/path")
Expand All @@ -492,9 +495,10 @@ func getTemplateParametersForTest(resourceRequirements, platformResources *v1.Re
mockOutputPath.OnGetPreviousCheckpointsPrefix().Return("/prev")

return template.Parameters{
TaskExecMetadata: &mockTaskExecMetadata,
Inputs: &mockInputReader,
OutputPath: &mockOutputPath,
TaskExecMetadata: &mockTaskExecMetadata,
Inputs: &mockInputReader,
OutputPath: &mockOutputPath,
IncludeConsoleURL: includeConsoleURL,
}
}

Expand All @@ -506,7 +510,7 @@ func TestAddFlyteCustomizationsToContainer(t *testing.T) {
Limits: v1.ResourceList{
v1.ResourceEphemeralStorage: resource.MustParse("2048Mi"),
},
}, nil)
}, nil, false, "")
container := &v1.Container{
Command: []string{
"{{ .Input }}",
Expand Down Expand Up @@ -554,7 +558,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
assert.True(t, container.Resources.Requests.Cpu().Equal(resource.MustParse("1")))
Expand All @@ -577,7 +581,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
assert.True(t, container.Resources.Requests.Cpu().Equal(resource.MustParse("1")))
Expand Down Expand Up @@ -612,7 +616,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
assert.True(t, container.Resources.Requests.Cpu().Equal(resource.MustParse("10")))
Expand Down Expand Up @@ -649,7 +653,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
templateParameters := getTemplateParametersForTest(&v1.ResourceRequirements{
Requests: overrideRequests,
Limits: overrideLimits,
}, &v1.ResourceRequirements{})
}, &v1.ResourceRequirements{}, false, "")

err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
Expand Down Expand Up @@ -684,7 +688,7 @@ func TestAddFlyteCustomizationsToContainer_ValidateExistingResources(t *testing.
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeEnsureExistingResourcesInRange, container)
assert.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package flytek8s

import (
"context"
"fmt"
"os"
"strconv"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -13,6 +15,10 @@ import (
"github.com/flyteorg/flyte/flytestdlib/contextutils"
)

const (
flyteExecutionURL = "FLYTE_EXECUTION_URL"
)

func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar {
var envVars []v1.EnvVar

Expand All @@ -32,7 +38,7 @@ func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar {
return envVars
}

func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1.EnvVar {

if id == nil || id.GetID().NodeExecutionId == nil || id.GetID().NodeExecutionId.ExecutionId == nil {
return []v1.EnvVar{}
Expand Down Expand Up @@ -69,6 +75,14 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
// },
}

if len(consoleURL) > 0 {
consoleURL = strings.TrimRight(consoleURL, "/")
envVars = append(envVars, v1.EnvVar{
Name: flyteExecutionURL,
Value: fmt.Sprintf("%s/projects/%s/domains/%s/executions/%s/nodeId/%s/nodes", consoleURL, nodeExecutionID.Project, nodeExecutionID.Domain, nodeExecutionID.Name, id.GetUniqueNodeID()),
})
}

// Task definition Level env variables.
if id.GetID().TaskId != nil {
taskID := id.GetID().TaskId
Expand Down Expand Up @@ -113,9 +127,9 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
return envVars
}

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID) ([]v1.EnvVar, []v1.EnvFromSource) {
func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource) {
envVars = append(envVars, GetContextEnvVars(ctx)...)
envVars = append(envVars, GetExecutionEnvVars(id)...)
envVars = append(envVars, GetExecutionEnvVars(id, consoleURL)...)

for k, v := range taskEnvironmentVariables {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -18,8 +19,53 @@ import (

func TestGetExecutionEnvVars(t *testing.T) {
mock := mockTaskExecutionIdentifier{}
envVars := GetExecutionEnvVars(mock)
assert.Len(t, envVars, 12)
tests := []struct {
name string
expectedEnvVars int
consoleURL string
expectedEnvVar *v12.EnvVar
}{
{
"no-console-url",
12,
"",
nil,
},
{
"with-console-url",
13,
"scheme://host/path",
&v12.EnvVar{
Name: "FLYTE_EXECUTION_URL",
Value: "scheme://host/path/projects/proj/domains/domain/executions/name/nodeId/unique-node-id/nodes",
},
},
{
"with-console-url-ending-in-single-slash",
13,
"scheme://host/path/",
&v12.EnvVar{
Name: "FLYTE_EXECUTION_URL",
Value: "scheme://host/path/projects/proj/domains/domain/executions/name/nodeId/unique-node-id/nodes",
},
},
{
"with-console-url-ending-in-multiple-slashes",
13,
"scheme://host/path////",
&v12.EnvVar{
Name: "FLYTE_EXECUTION_URL",
Value: "scheme://host/path/projects/proj/domains/domain/executions/name/nodeId/unique-node-id/nodes",
},
},
}
for _, tt := range tests {
envVars := GetExecutionEnvVars(mock, tt.consoleURL)
assert.Len(t, envVars, tt.expectedEnvVars)
if tt.expectedEnvVar != nil {
assert.True(t, proto.Equal(&envVars[4], tt.expectedEnvVar))
}
}
}

func TestGetTolerationsForResources(t *testing.T) {
Expand Down Expand Up @@ -257,7 +303,7 @@ func TestDecorateEnvVars(t *testing.T) {
defer os.Setenv("value", originalEnvVal)

expected := append(defaultEnv, GetContextEnvVars(ctx)...)
expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{})...)
expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{}, "")...)

aggregated := append(expected, v12.EnvVar{Name: "k", Value: "v"})
type args struct {
Expand All @@ -270,20 +316,21 @@ func TestDecorateEnvVars(t *testing.T) {
additionEnvVar map[string]string
additionEnvVarFromEnv map[string]string
executionEnvVar map[string]string
consoleURL string
want []v12.EnvVar
}{
{"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, expected},
{"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, aggregated},
{"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, aggregated},
{"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, aggregated},
{"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, "", expected},
{"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, "", aggregated},
{"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, "", aggregated},
{"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, "", aggregated},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
DefaultEnvVars: tt.additionEnvVar,
DefaultEnvVarsFromEnv: tt.additionEnvVarFromEnv,
}))
if got, _ := DecorateEnvVars(ctx, tt.args.envVars, tt.executionEnvVar, tt.args.id); !reflect.DeepEqual(got, tt.want) {
if got, _ := DecorateEnvVars(ctx, tt.args.envVars, tt.executionEnvVar, tt.args.id, tt.consoleURL); !reflect.DeepEqual(got, tt.want) {
t.Errorf("DecorateEnvVars() = %v, want %v", got, tt.want)
}
})
Expand Down
22 changes: 18 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,19 @@ func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v
return podSpec, &objectMeta, primaryContainerName, nil
}

func hasExternalLinkType(taskTemplate *core.TaskTemplate) bool {
if taskTemplate == nil {
return false
}
config := taskTemplate.GetConfig()
if config == nil {
return false
}
// The presence of any "link_type" is sufficient to guarantee that the console URL should be included.
_, exists := config["link_type"]
return exists
}

// ApplyFlytePodConfiguration updates the PodSpec and ObjectMeta with various Flyte configuration. This includes
// applying default k8s configuration, applying overrides (resources etc.), injecting copilot containers, and merging with the
// configuration PodTemplate (if exists).
Expand All @@ -328,10 +341,11 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut

// add flyte resource customizations to containers
templateParameters := template.Parameters{
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
IncludeConsoleURL: hasExternalLinkType(taskTemplate),
}

resourceRequests := make([]v1.ResourceRequirements, 0, len(podSpec.Containers))
Expand Down
Loading

0 comments on commit 7d788cb

Please sign in to comment.