From ec08ada654df3a03e598db0da607f94f1056596e Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Sat, 6 Mar 2021 18:30:28 -0800 Subject: [PATCH 1/2] Add Map state to embedded wfm. We already have parallel, so map is pretty similar! Also, I added validation to parallel's branches; I'm not actually sure if parallel allows zero branches, but it is conceivable I guess (the result of a parallel with no branches would be the empty array). --- embedded/embedded.go | 35 +++++++++++++++++++++++++++++++- embedded/embedded_test.go | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/embedded/embedded.go b/embedded/embedded.go index a002e92d..a925700a 100644 --- a/embedded/embedded.go +++ b/embedded/embedded.go @@ -218,7 +218,36 @@ func validateWorkflowDefinitionStates(wfd models.WorkflowDefinition, resources m if state.Seconds <= 0 { return errors.Errorf("invalid seconds parameter in wait %s.%s", wfd.Name, stateName) } - case models.SLStateTypeSucceed, models.SLStateTypeFail, models.SLStateTypeParallel: + case models.SLStateTypeMap: + checkNextState = true + if state.Iterator == nil { + return errors.Errorf("required parameter Iterator for Map state not given in state %s.%s", wfd.Name, stateName) + } + if state.MaxConcurrency < 0 { + return errors.Errorf("invalid MaxConcurrency in Map state %s.%s, must, cannot be negative, got %d", wfd.Name, stateName, state.MaxConcurrency) + } + innerWFD := models.WorkflowDefinition{ + Name: fmt.Sprintf("%s__Iterator", stateName), + StateMachine: state.Iterator, + } + if err := validateWorkflowDefinition(innerWFD, resources); err != nil { + return errors.Errorf("inside the Iterator for Map state %s.%s: %w", + wfd.Name, stateName, err, + ) + } + case models.SLStateTypeParallel: + for i, branch := range state.Branches { + innerWFD := models.WorkflowDefinition{ + Name: fmt.Sprintf("%s__Branch[%d]", stateName, i), + StateMachine: branch, + } + if err := validateWorkflowDefinition(innerWFD, resources); err != nil { + return errors.Errorf("inside the Branch[%d] for Parallel state %s.%s: %w", + i, wfd.Name, stateName, + ) + } + } + case models.SLStateTypeSucceed, models.SLStateTypeFail: // no op default: return errors.Errorf("invalid state type '%s' in %s.%s", state.Type, wfd.Name, stateName) @@ -326,6 +355,10 @@ func (e *Embedded) setStateMachineResources(i *models.StartWorkflowRequest, stat state.Branches[idx] = branch } stateMachine.States[stateName] = state + + case models.SLStateTypeMap: + e.setStateMachineResources(i, state.Iterator) + stateMachine.States[stateName] = state } } } diff --git a/embedded/embedded_test.go b/embedded/embedded_test.go index eba3f9df..3669934c 100644 --- a/embedded/embedded_test.go +++ b/embedded/embedded_test.go @@ -352,6 +352,48 @@ var validateWorkflowDefinitionStatesTests = []validateWorkflowDefinitionStatesTe require.NoError(t, err) }, }, + { + description: "validate map state (invalid inner)", + input: models.WorkflowDefinition{ + StateMachine: &models.SLStateMachine{ + States: map[string]models.SLState{ + "map": models.SLState{ + Type: models.SLStateTypeMap, + Iterator: &models.SLStateMachine{}, + End: true, + }, + }, + }, + }, + assertions: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + description: "validate map state (valid inner)", + input: models.WorkflowDefinition{ + StateMachine: &models.SLStateMachine{ + States: map[string]models.SLState{ + "map": models.SLState{ + Type: models.SLStateTypeMap, + Iterator: &models.SLStateMachine{ + States: map[string]models.SLState{ + "pass": models.SLState{ + End: true, + Type: models.SLStateTypePass, + Result: "result", + }, + }, + }, + End: true, + }, + }, + }, + }, + assertions: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, { description: "invalid state type", input: models.WorkflowDefinition{ From 11592d73737c7c307e3ae337e3ebfeb913a14494 Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Sat, 6 Mar 2021 18:43:43 -0800 Subject: [PATCH 2/2] Bump version (and wag version) --- docs/overview.md | 2 +- gen-go/client/client.go | 37 +++++++++++++++++++------------------ gen-go/client/doer.go | 9 +++++++++ gen-js/index.js | 2 +- gen-js/package.json | 2 +- swagger.yml | 2 +- 6 files changed, 32 insertions(+), 22 deletions(-) diff --git a/docs/overview.md b/docs/overview.md index a455aec1..0ca0eaa6 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions ### Version information -*Version* : 0.13.0 +*Version* : 0.13.1 ### URI scheme diff --git a/gen-go/client/client.go b/gen-go/client/client.go index f244b110..0d5590b3 100644 --- a/gen-go/client/client.go +++ b/gen-go/client/client.go @@ -23,7 +23,7 @@ var _ = strconv.FormatInt var _ = bytes.Compare // Version of the client. -const Version = "0.13.0" +const Version = "0.13.1" // VersionHeader is sent with every request. const VersionHeader = "X-Client-Version" @@ -54,7 +54,8 @@ func New(basePath string) *WagClient { retry := retryDoer{d: tracing, retryPolicy: SingleRetryPolicy{}} logger := logger.New("workflow-manager-wagclient") circuit := &circuitBreakerDoer{ - d: &retry, + d: &retry, + // TODO: INFRANG-4404 allow passing circuitBreakerOptions debug: true, // one circuit for each service + url pair circuitName: fmt.Sprintf("workflow-manager-%s", shortHash(basePath)), @@ -240,7 +241,7 @@ func (c *WagClient) doHealthCheckRequest(ctx context.Context, req *http.Request, return &output default: - return &models.InternalError{Message: "Unknown response"} + return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -347,7 +348,7 @@ func (c *WagClient) doPostStateResourceRequest(ctx context.Context, req *http.Re return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -453,7 +454,7 @@ func (c *WagClient) doDeleteStateResourceRequest(ctx context.Context, req *http. return &output default: - return &models.InternalError{Message: "Unknown response"} + return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -564,7 +565,7 @@ func (c *WagClient) doGetStateResourceRequest(ctx context.Context, req *http.Req return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -677,7 +678,7 @@ func (c *WagClient) doPutStateResourceRequest(ctx context.Context, req *http.Req return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -773,7 +774,7 @@ func (c *WagClient) doGetWorkflowDefinitionsRequest(ctx context.Context, req *ht return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -880,7 +881,7 @@ func (c *WagClient) doNewWorkflowDefinitionRequest(ctx context.Context, req *htt return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -991,7 +992,7 @@ func (c *WagClient) doGetWorkflowDefinitionVersionsByNameRequest(ctx context.Con return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1113,7 +1114,7 @@ func (c *WagClient) doUpdateWorkflowDefinitionRequest(ctx context.Context, req * return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1224,7 +1225,7 @@ func (c *WagClient) doGetWorkflowDefinitionByNameAndVersionRequest(ctx context.C return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1421,7 +1422,7 @@ func (c *WagClient) doGetWorkflowsRequest(ctx context.Context, req *http.Request return nil, "", &output default: - return nil, "", &models.InternalError{Message: "Unknown response"} + return nil, "", &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1537,7 +1538,7 @@ func (c *WagClient) doStartWorkflowRequest(ctx context.Context, req *http.Reques return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1654,7 +1655,7 @@ func (c *WagClient) doCancelWorkflowRequest(ctx context.Context, req *http.Reque return &output default: - return &models.InternalError{Message: "Unknown response"} + return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1765,7 +1766,7 @@ func (c *WagClient) doGetWorkflowByIDRequest(ctx context.Context, req *http.Requ return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -1887,7 +1888,7 @@ func (c *WagClient) doResumeWorkflowByIDRequest(ctx context.Context, req *http.R return nil, &output default: - return nil, &models.InternalError{Message: "Unknown response"} + return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } @@ -2002,7 +2003,7 @@ func (c *WagClient) doResolveWorkflowByIDRequest(ctx context.Context, req *http. return &output default: - return &models.InternalError{Message: "Unknown response"} + return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)} } } diff --git a/gen-go/client/doer.go b/gen-go/client/doer.go index 2b40f00d..8844ccd7 100644 --- a/gen-go/client/doer.go +++ b/gen-go/client/doer.go @@ -277,6 +277,15 @@ func (d *circuitBreakerDoer) init() { if e.Type != "HystrixCommand" { continue } + + // Today we are creating a stream for every client so lets only log events for this + // circuit. In an ideal world we only create a single stream and pass it to the client. + // Lets worry about doing this when we implement passing circuitBreakerOptions + // to disable debug mode + if e.Name != d.circuitName { + continue + } + lastSeen, ok := lastEventSeen[e.Name] lastEventSeen[e.Name] = e diff --git a/gen-js/index.js b/gen-js/index.js index 60574e68..39acaa1c 100644 --- a/gen-js/index.js +++ b/gen-js/index.js @@ -2461,7 +2461,7 @@ module.exports.Errors = Errors; module.exports.DefaultCircuitOptions = defaultCircuitOptions; -const version = "0.13.0"; +const version = "0.13.1"; const versionHeader = "X-Client-Version"; module.exports.Version = version; module.exports.VersionHeader = versionHeader; diff --git a/gen-js/package.json b/gen-js/package.json index 6e101495..48e70424 100644 --- a/gen-js/package.json +++ b/gen-js/package.json @@ -1,6 +1,6 @@ { "name": "workflow-manager", - "version": "0.13.0", + "version": "0.13.1", "description": "Orchestrator for AWS Step Functions", "main": "index.js", "dependencies": { diff --git a/swagger.yml b/swagger.yml index 97bc6c78..0d211637 100644 --- a/swagger.yml +++ b/swagger.yml @@ -4,7 +4,7 @@ info: description: Orchestrator for AWS Step Functions # when changing the version here, make sure to # re-run `make generate` to generate clients and server - version: 0.13.0 + version: 0.13.1 x-npm-package: workflow-manager schemes: - http