Skip to content

Commit

Permalink
Infra 2881 team tagging (#146)
Browse files Browse the repository at this point in the history
* added default tags to new work flow definition [requests]

* made change subscribe to swagger naming convention

* executor now WorkflowDefinition.DefaultTags when empty or nil tags passed to CreateWorkflow

* add tags to default instead of replacing

* added check for defaultTags not being modified by CreateWorkflow() and addressed nits

* fixed copy by reference in test

* updated swagger.yml version
  • Loading branch information
dhspaeth12 authored Sep 26, 2018
1 parent 64f946a commit 0d95e2c
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ PKG = github.com/Clever/$(APP_NAME)
PKGS := $(shell go list ./... | grep -v /vendor | grep -v /gen-go | grep -v /workflow-ops | grep -v /dynamodb)
PKGS := $(PKGS) $(PKG)/gen-go/server/db/dynamodb

WAG_VERSION := latest
# Currently using old version because ```WAG_VERSION := latest``` is broken on this repo
WAG_VERSION := v1.7.2

$(eval $(call golang-version-check,1.10))

Expand Down
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions


### Version information
*Version* : 0.9.1
*Version* : 0.9.2


### URI scheme
Expand Down
11 changes: 10 additions & 1 deletion executor/workflow_manager_sfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,19 @@ func (wm *SFNWorkflowManager) CreateWorkflow(ctx context.Context, wd models.Work
return nil, err
}

mergedTags := map[string]interface{}{}
for k, v := range wd.DefaultTags {
mergedTags[k] = v
}
// tags passed to CreateWorkflow overwrite wd.DefaultTags upon key conflict
for k, v := range tags {
mergedTags[k] = v
}

// save the workflow before starting execution to ensure we don't have untracked executions
// i.e. execution was started but we failed to save workflow
// If we fail starting the execution, we can resolve this out of band (TODO: should support cancelling)
workflow := resources.NewWorkflow(&wd, input, namespace, queue, tags)
workflow := resources.NewWorkflow(&wd, input, namespace, queue, mergedTags)
if err := wm.store.SaveWorkflow(ctx, *workflow); err != nil {
return nil, err
}
Expand Down
61 changes: 61 additions & 0 deletions executor/workflow_manager_sfn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func TestCreateWorkflow(t *testing.T) {
SendMessageWithContext(gomock.Any(), gomock.Any()).
Return(&sqs.SendMessageOutput{}, nil)

defaultTags := map[string]interface{}{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
}
assert.Equal(t, c.workflowDefinition.DefaultTags, defaultTags)
workflow, err := c.manager.CreateWorkflow(ctx, *c.workflowDefinition,
input,
"namespace",
Expand All @@ -176,6 +182,11 @@ func TestCreateWorkflow(t *testing.T) {
assert.Equal(t, workflow.Namespace, "namespace")
assert.Equal(t, workflow.Input, input)

// Create called without tags, so tags should match c.workflowDefinition.DefaultTags
assert.Equal(t, workflow.Tags, defaultTags)
// Ensure workflow definition tags not modified by CreateWorkflow()
assert.Equal(t, c.workflowDefinition.DefaultTags, defaultTags)

savedWorkflow, err := c.store.GetWorkflowByID(ctx, workflow.ID)
assert.Nil(t, err)
assert.Equal(t, workflow.CreatedAt.String(), savedWorkflow.CreatedAt.String())
Expand Down Expand Up @@ -216,6 +227,56 @@ func TestCreateWorkflow(t *testing.T) {
assert.Equal(t, workflow.ID, wfID)
})

t.Run("CreateWorkflow with added tags", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newSFNManagerTestController(t)
defer c.tearDown()
stateMachineArn := stateMachineARN(c.manager.region, c.manager.accountID,
c.workflowDefinition.Name,
c.workflowDefinition.Version,
"namespace",
c.workflowDefinition.StateMachine.StartAt,
)
c.mockSFNAPI.EXPECT().
DescribeStateMachine(&sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(stateMachineArn),
}).
Return(&sfn.DescribeStateMachineOutput{
StateMachineArn: aws.String(stateMachineArn),
}, nil)
c.mockSFNAPI.EXPECT().
StartExecution(gomock.Any()).
Return(&sfn.StartExecutionOutput{}, nil)
c.mockSQSAPI.EXPECT().
SendMessageWithContext(gomock.Any(), gomock.Any()).
Return(&sqs.SendMessageOutput{}, nil)

defaultTags := map[string]interface{}{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
}
workflow, err := c.manager.CreateWorkflow(ctx, *c.workflowDefinition,
input,
"namespace",
"queue",
map[string]interface{}{"newTag1": "newVal1", "newTag2": "newVal2"},
)
assert.Nil(t, err)
// Create called with tags, so they should be added to c.workflowDefinition.DefaultTags
// in the new workflow
assert.Equal(t, workflow.Tags, map[string]interface{}{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
"newTag1": "newVal1",
"newTag2": "newVal2",
})
// Ensure workflow definition tags not modified by CreateWorkflow()
assert.Equal(t, c.workflowDefinition.DefaultTags, defaultTags)
})

t.Run("CreateWorkflow deletes workflow on StartExecution failure", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
3 changes: 3 additions & 0 deletions gen-go/models/new_workflow_definition_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions gen-go/models/workflow_definition.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion gen-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "workflow-manager",
"version": "0.9.1",
"version": "0.9.2",
"description": "Orchestrator for AWS Step Functions",
"main": "index.js",
"dependencies": {
Expand Down
7 changes: 6 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,12 @@ func newWorkflowDefinitionFromRequest(req models.NewWorkflowDefinitionRequest) (
numStates-len(req.StateMachine.States))
}

return resources.NewWorkflowDefinition(req.Name, req.Manager, req.StateMachine)
// verify request's default_tags (map[string]interface{}) are actually map[string]string
if err := validateTagsMap(req.DefaultTags); err != nil {
return nil, err
}

return resources.NewWorkflowDefinition(req.Name, req.Manager, req.StateMachine, req.DefaultTags)
}

// validateTagsMap ensures that all tags values are strings
Expand Down
5 changes: 5 additions & 0 deletions resources/kitchensink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func KitchenSinkWorkflowDefinition(t *testing.T) *models.WorkflowDefinition {
},
},
},
map[string]interface{}{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
},
)
assert.Nil(t, err)

Expand Down
7 changes: 6 additions & 1 deletion resources/workflow_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ import (
)

// NewWorkflowDefinition creates a new Workflow
func NewWorkflowDefinition(name string, manager models.Manager, stateMachine *models.SLStateMachine) (*models.WorkflowDefinition, error) {
func NewWorkflowDefinition(
name string, manager models.Manager,
stateMachine *models.SLStateMachine,
defaultTags map[string]interface{}) (*models.WorkflowDefinition, error) {
return &models.WorkflowDefinition{
ID: uuid.NewV4().String(),
Name: name,
Version: 0,
CreatedAt: strfmt.DateTime(time.Now()),
Manager: manager,
StateMachine: stateMachine,
DefaultTags: defaultTags,
}, nil
}

Expand All @@ -30,6 +34,7 @@ func NewWorkflowDefinitionVersion(def *models.WorkflowDefinition, version int) *
CreatedAt: strfmt.DateTime(time.Now()),
Manager: def.Manager,
StateMachine: def.StateMachine,
DefaultTags: def.DefaultTags,
}
}

Expand Down
1 change: 1 addition & 0 deletions resources/workflow_definitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestCopyWorflowDefinition(t *testing.T) {
assert.Equal(t, wf.Version, copy.Version)
assert.Equal(t, wf.Version, copy.Version)
assert.Equal(t, wf.Manager, copy.Manager)
assert.Equal(t, wf.DefaultTags, copy.DefaultTags)

t.Log("StateMachines are equal but pointers are different")
assert.Equal(t, wf.StateMachine, copy.StateMachine)
Expand Down
10 changes: 9 additions & 1 deletion swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ info:
description: Orchestrator for AWS Step Functions
# when changing the version here, make sure to
# re-run `make generate` to generate clients and server
version: 0.9.1
version: 0.9.2
x-npm-package: workflow-manager
schemes:
- http
Expand Down Expand Up @@ -387,6 +387,10 @@ definitions:
$ref: '#/definitions/Manager'
stateMachine:
$ref: '#/definitions/SLStateMachine'
defaultTags:
description: "defaultTags: object with key-value pairs; keys and values should be strings"
additionalProperties:
type: object

WorkflowDefinition:
x-db:
Expand All @@ -412,6 +416,10 @@ definitions:
$ref: '#/definitions/Manager'
stateMachine:
$ref: '#/definitions/SLStateMachine'
defaultTags:
description: "defaultTags: object with key-value pairs; keys and values should be strings"
additionalProperties:
type: object

Manager:
type: string
Expand Down

0 comments on commit 0d95e2c

Please sign in to comment.