Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key-value execution tags #5453

Merged
merged 17 commits into from
Jun 8, 2024
1 change: 1 addition & 0 deletions flyteadmin/pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
Signal = "s"
AdminTag = "at"
ExecutionAdminTag = "eat"
ExecutionTag = "et"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
12 changes: 11 additions & 1 deletion flyteadmin/pkg/common/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@
if entity == Execution && executionIdentifierFields[field] {
return fmt.Sprintf("execution_%s", field)
}
// admin_tag table has been migrated to an execution_tag table, so we need to customize the field name.
if entity == AdminTag && field == "name" {
return "key"

Check warning on line 261 in flyteadmin/pkg/common/filters.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/common/filters.go#L261

Added line #L261 was not covered by tests
}
return field
}

Expand All @@ -265,6 +269,10 @@
if entity == NamedEntity && entityMetadataFields[field] {
return NamedEntityMetadata
}
// admin_tag table has been migrated to an execution_tag table.
if entity == AdminTag {
return ExecutionTag

Check warning on line 274 in flyteadmin/pkg/common/filters.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/common/filters.go#L274

Added line #L274 was not covered by tests
}
return entity
}

Expand All @@ -289,8 +297,10 @@
return nil, GetInvalidRepeatedValueFilterErr(function)
}
customizedField := customizeField(field, entity)
customizedEntity := customizeEntity(field, entity)

return &inlineFilterImpl{
entity: entity,
entity: customizedEntity,
function: function,
field: customizedField,
repeatedValue: repeatedValue,
Expand Down
66 changes: 38 additions & 28 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,29 +819,29 @@

func (m *ExecutionManager) launchExecutionAndPrepareModel(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
context.Context, *models.Execution, []*models.ExecutionTag, error) {

err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration())
if err != nil {
logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err)
return nil, nil, err
return nil, nil, nil, err
}
if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK {
logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan)
// When tasks can have defaults this will need to handle Artifacts as well.
ctx, model, err := m.launchSingleTaskExecution(ctx, request, requestedAt)
return ctx, model, err
return ctx, model, nil, err
}

launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan)
if err != nil {
logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err)
return nil, nil, err
return nil, nil, nil, err

Check warning on line 839 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L839

Added line #L839 was not covered by tests
}
launchPlan, err := transformers.FromLaunchPlanModel(launchPlanModel)
if err != nil {
logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err)
return nil, nil, err
return nil, nil, nil, err

Check warning on line 844 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L844

Added line #L844 was not covered by tests
}

var lpExpectedInputs *core.ParameterMap
Expand All @@ -860,23 +860,23 @@
logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+
"fixed inputs: %+v and expected inputs: %+v with err %v",
request.Inputs, launchPlan.Spec.FixedInputs, lpExpectedInputs, err)
return nil, nil, err
return nil, nil, nil, err
}

workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
return nil, nil, nil, err

Check warning on line 869 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L869

Added line #L869 was not covered by tests
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
return nil, nil, nil, err

Check warning on line 874 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L874

Added line #L874 was not covered by tests
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
return nil, nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure
Expand All @@ -900,7 +900,7 @@
var sourceExecutionID uint
parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 903 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L903

Added line #L903 was not covered by tests
}

// Dynamically assign task resource defaults.
Expand All @@ -914,32 +914,32 @@

inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 917 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L917

Added line #L917 was not covered by tests
}
userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 921 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L921

Added line #L921 was not covered by tests
}

executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 926 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L926

Added line #L926 was not covered by tests
}

namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain)

labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 934 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L934

Added line #L934 was not covered by tests
}
labels, err = m.addProjectLabels(ctx, request.Project, labels)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 938 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L938

Added line #L938 was not covered by tests
}
annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 942 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L942

Added line #L942 was not covered by tests
}
var rawOutputDataConfig *admin.RawOutputDataConfig
if executionConfig.RawOutputDataConfig != nil {
Expand All @@ -948,7 +948,7 @@

clusterAssignment, err := m.getClusterAssignment(ctx, &request)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 951 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L951

Added line #L951 was not covered by tests
}

var executionClusterLabel *admin.ExecutionClusterLabel
Expand All @@ -972,7 +972,7 @@

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
if err != nil {
return nil, nil, err
return nil, nil, nil, err

Check warning on line 975 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L975

Added line #L975 was not covered by tests
}
if overrides != nil {
executionParameters.TaskPluginOverrides = overrides
Expand Down Expand Up @@ -1041,21 +1041,28 @@
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, err
return nil, nil, nil, err

Check warning on line 1044 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L1044

Added line #L1044 was not covered by tests
}

return ctx, executionModel, nil
executionTagModel, err := transformers.CreateExecutionTagModel(createExecModelInput)
if err != nil {
logger.Infof(ctx, "Failed to create execution tag model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, nil, err

Check warning on line 1051 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L1049-L1051

Added lines #L1049 - L1051 were not covered by tests
}

return ctx, executionModel, executionTagModel, nil
}

// Inserts an execution model into the database store and emits platform metrics.
func (m *ExecutionManager) createExecutionModel(
ctx context.Context, executionModel *models.Execution) (*core.WorkflowExecutionIdentifier, error) {
ctx context.Context, executionModel *models.Execution, executionTagModel []*models.ExecutionTag) (*core.WorkflowExecutionIdentifier, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: executionModel.ExecutionKey.Project,
Domain: executionModel.ExecutionKey.Domain,
Name: executionModel.ExecutionKey.Name,
}
err := m.db.ExecutionRepo().Create(ctx, *executionModel)
err := m.db.ExecutionRepo().Create(ctx, *executionModel, executionTagModel)
if err != nil {
logger.Debugf(ctx, "failed to save newly created execution [%+v] with id %+v to db with err %v",
workflowExecutionIdentifier, workflowExecutionIdentifier, err)
Expand All @@ -1077,12 +1084,13 @@
request.Inputs = request.GetSpec().GetInputs()
}
var executionModel *models.Execution
var executionTagModel []*models.ExecutionTag
var err error
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt)
ctx, executionModel, executionTagModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt)
if err != nil {
return nil, err
}
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, executionTagModel)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1127,7 +1135,8 @@
executionSpec.Metadata.ReferenceExecution = existingExecution.Id
executionSpec.OverwriteCache = request.GetOverwriteCache()
var executionModel *models.Execution
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
var executionTagModel []*models.ExecutionTag
ctx, executionModel, executionTagModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Name,
Expand All @@ -1138,7 +1147,7 @@
return nil, err
}
executionModel.SourceExecutionID = existingExecutionModel.ID
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, executionTagModel)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1178,7 +1187,8 @@
executionSpec.Metadata.Mode = admin.ExecutionMetadata_RECOVERED
executionSpec.Metadata.ReferenceExecution = existingExecution.Id
var executionModel *models.Execution
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
var executionTagModel []*models.ExecutionTag
ctx, executionModel, executionTagModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Name,
Expand All @@ -1189,7 +1199,7 @@
return nil, err
}
executionModel.SourceExecutionID = existingExecutionModel.ID
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, executionTagModel)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion flyteadmin/pkg/manager/impl/util/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var filterFieldEntityPrefix = map[string]common.Entity{
"signal": common.Signal,
"admin_tag": common.AdminTag,
"execution_admin_tag": common.ExecutionAdminTag,
"execution_tag": common.ExecutionTag,
}

func parseField(field string, primaryEntity common.Entity) (common.Entity, string) {
Expand Down Expand Up @@ -121,7 +122,7 @@ func prepareValues(field string, values []string) (interface{}, error) {
}

var allowedJoinEntities = map[common.Entity]sets.String{
common.Execution: sets.NewString(common.Execution, common.LaunchPlan, common.Workflow, common.Task, common.AdminTag),
common.Execution: sets.NewString(common.Execution, common.LaunchPlan, common.Workflow, common.Task, common.AdminTag, common.ExecutionTag),
common.LaunchPlan: sets.NewString(common.LaunchPlan, common.Workflow),
common.NodeExecution: sets.NewString(common.NodeExecution, common.Execution),
common.NodeExecutionEvent: sets.NewString(common.NodeExecutionEvent),
Expand All @@ -133,6 +134,7 @@ var allowedJoinEntities = map[common.Entity]sets.String{
common.Project: sets.NewString(common.Project),
common.Signal: sets.NewString(common.Signal),
common.AdminTag: sets.NewString(common.AdminTag),
common.ExecutionTag: sets.NewString(common.ExecutionTag),
}

var entityColumns = map[common.Entity]sets.String{
Expand All @@ -148,6 +150,7 @@ var entityColumns = map[common.Entity]sets.String{
common.Project: models.ProjectColumns,
common.Signal: models.SignalColumns,
common.AdminTag: models.AdminTagColumns,
common.ExecutionTag: models.ExecutionTagColumns,
}

func ParseFilters(filterParams string, primaryEntity common.Entity) ([]common.InlineFilter, error) {
Expand Down
55 changes: 55 additions & 0 deletions flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,61 @@
return tx.Table("launch_plans").Migrator().DropColumn(&models.LaunchPlan{}, "launch_condition_type")
},
},

{
ID: "2024-06-06-execution-tags",
Migrate: func(tx *gorm.DB) error {
type ExecutionKey struct {
Project string `gorm:"primary_key;column:execution_project;size:255"`
Domain string `gorm:"primary_key;column:execution_domain;size:255"`
Name string `gorm:"primary_key;column:execution_name;size:255"`

Check warning on line 1232 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1228-L1232

Added lines #L1228 - L1232 were not covered by tests
}

type ExecutionTag struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time `gorm:"type:time"`
UpdatedAt time.Time `gorm:"type:time"`
DeletedAt *time.Time `gorm:"index"`
ExecutionKey
Key string `gorm:"primary_key;index:tag_key;size:255"`
Value string `gorm:"primary_key;index:tag_value;size:255"`

Check warning on line 1242 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1235-L1242

Added lines #L1235 - L1242 were not covered by tests
}

return tx.Transaction(func(_ *gorm.DB) error {

Check warning on line 1245 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1245

Added line #L1245 was not covered by tests
// Create an execution_tags Table
if err := tx.AutoMigrate(&ExecutionTag{}); err != nil {
return err

Check warning on line 1248 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1247-L1248

Added lines #L1247 - L1248 were not covered by tests
}
// Drop execution_admin_tags and admin_tags tables, and create a new table execution_tags
// to store tags associated with executions.
sql := "INSERT INTO execution_tags (execution_project, execution_domain, execution_name, created_at, updated_at, deleted_at, key, value)" +
" SELECT execution_project, execution_domain, execution_name, created_at, updated_at, deleted_at, name as key, null as value" +
" FROM execution_admin_tags" +
" INNER JOIN admin_tags a on execution_admin_tags.admin_tag_id = a.id;"
if err := tx.Exec(sql).Error; err != nil {
return err

Check warning on line 1257 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1252-L1257

Added lines #L1252 - L1257 were not covered by tests
}
return nil

Check warning on line 1259 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1259

Added line #L1259 was not covered by tests
})
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_tags")
},

Check warning on line 1264 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1262-L1264

Added lines #L1262 - L1264 were not covered by tests
},

{
ID: "2024-06-06-drop-execution_admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_admin_tags")
},

Check warning on line 1271 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1269-L1271

Added lines #L1269 - L1271 were not covered by tests
},

{
ID: "2024-06-06-drop-admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("admin_tags")
},

Check warning on line 1278 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1276-L1278

Added lines #L1276 - L1278 were not covered by tests
},
}

var m = append(LegacyMigrations, NoopMigrations...)
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/repositories/gormimpl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ const taskExecutionTableName = "task_executions"
const taskTableName = "tasks"
const workflowTableName = "workflows"
const descriptionEntityTableName = "description_entities"
const AdminTagsTableName = "admin_tags"
const executionAdminTagsTableName = "execution_admin_tags"
const executionTagsTableName = "execution_tags"

const limit = "limit"
const filters = "filters"
Expand All @@ -49,6 +48,7 @@ var entityToTableName = map[common.Entity]string{
common.Signal: "signals",
common.AdminTag: "admin_tags",
common.ExecutionAdminTag: "execution_admin_tags",
common.ExecutionTag: "execution_tags",
}

var innerJoinExecToNodeExec = fmt.Sprintf(
Expand Down
Loading
Loading