From 4681805d1fd6374dd8eb4a262555b3f32626b9bb Mon Sep 17 00:00:00 2001 From: mohit Date: Tue, 10 Oct 2017 08:27:29 +0000 Subject: [PATCH 1/5] use ddb projection expression for summaryOnly=true --- store/dynamodb/dynamodb_store.go | 12 +------ store/dynamodb/workflow.go | 57 +++++++++++++++++++++++++++++--- store/memory/memory_store.go | 2 +- store/tests/tests.go | 2 +- 4 files changed, 55 insertions(+), 18 deletions(-) diff --git a/store/dynamodb/dynamodb_store.go b/store/dynamodb/dynamodb_store.go index 76b07f61..8aff6e40 100644 --- a/store/dynamodb/dynamodb_store.go +++ b/store/dynamodb/dynamodb_store.go @@ -556,7 +556,7 @@ func (d DynamoDB) GetWorkflows(query *store.WorkflowQuery) ([]models.Workflow, s } else { dbQuery, err = ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt{ WorkflowDefinitionName: query.DefinitionName, - }.ConstructQuery() + }.ConstructQuery(query.SummaryOnly) } if err != nil { return workflows, nextPageToken, err @@ -585,16 +585,6 @@ func (d DynamoDB) GetWorkflows(query *store.WorkflowQuery) ([]models.Workflow, s return workflows, nextPageToken, err } - // TODO: Optimization - use a projection expression instead to limit which fields are returned - // by the dynamodb query. - if query.SummaryOnly { - workflow.Jobs = []*models.Job{} - workflow.WorkflowDefinition = &models.WorkflowDefinition{ - Name: workflow.WorkflowDefinition.Name, - Version: workflow.WorkflowDefinition.Version, - } - } - workflows = append(workflows, workflow) } diff --git a/store/dynamodb/workflow.go b/store/dynamodb/workflow.go index 2d71fef8..1a42ddb2 100644 --- a/store/dynamodb/workflow.go +++ b/store/dynamodb/workflow.go @@ -2,6 +2,7 @@ package dynamodb import ( "fmt" + "strings" "github.com/Clever/workflow-manager/gen-go/models" "github.com/Clever/workflow-manager/store" @@ -11,6 +12,32 @@ import ( "github.com/go-openapi/strfmt" ) +// SummaryKeys are json paths to the Workflow fields we want to pull out of dynamodb +// when summaryOnly=true in WorkflowQuery +var SummaryKeys = []string{ + "Workflow.createdAt", + "Workflow.id", + "Workflow.#I", // input + "Workflow.lastUpdated", + "Workflow.queue", + "Workflow.namespace", + "Workflow.retries", + "Workflow.retryFor", + "Workflow.#S", // status + "Workflow.statusReason", + "Workflow.tags", + + "Workflow.workflowDefinition.#N", + "Workflow.workflowDefinition.version", +} + +var summaryProjectionExpression = strings.Join(SummaryKeys, ", ") +var summaryExpressionAttributeNames = map[string]*string{ + "#S": aws.String("status"), + "#I": aws.String("input"), + "#N": aws.String("name"), +} + // ddbWorkflow represents the workflow as stored in dynamo. // Use this to make PutItem queries. type ddbWorkflow struct { @@ -103,12 +130,13 @@ func (sk ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt) AttributeDefinition } } -func (sk ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt) ConstructQuery() (*dynamodb.QueryInput, error) { +func (sk ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt) ConstructQuery(summaryOnly bool) (*dynamodb.QueryInput, error) { workflowNameAV, err := dynamodbattribute.Marshal(sk.WorkflowDefinitionName) if err != nil { return nil, fmt.Errorf("could not marshal workflow definition name: %s", err) } - return &dynamodb.QueryInput{ + + queryInput := &dynamodb.QueryInput{ IndexName: aws.String(sk.Name()), ExpressionAttributeNames: map[string]*string{ "#W": aws.String("_gsi-wn"), @@ -117,7 +145,13 @@ func (sk ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt) ConstructQuery() (* ":workflowName": workflowNameAV, }, KeyConditionExpression: aws.String("#W = :workflowName"), - }, nil + } + + if summaryOnly { + onlySummaryFields(queryInput) + } + + return queryInput, nil } func (sk ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt) KeySchema() []*dynamodb.KeySchemaElement { @@ -167,7 +201,7 @@ func (sk ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt) ConstructQuery( return nil, fmt.Errorf("workflow status filter is required for %s index", sk.Name()) } - return &dynamodb.QueryInput{ + queryInput := &dynamodb.QueryInput{ IndexName: aws.String(sk.Name()), ExpressionAttributeNames: map[string]*string{ "#WS": aws.String("_gsi-wn-and-status"), @@ -180,7 +214,13 @@ func (sk ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt) ConstructQuery( }, }, KeyConditionExpression: aws.String("#WS = :workflowNameAndStatus"), - }, nil + } + + if query.SummaryOnly { + onlySummaryFields(queryInput) + } + + return queryInput, nil } func (sk ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt) KeySchema() []*dynamodb.KeySchemaElement { @@ -249,3 +289,10 @@ func (sk ddbWorkflowSecondaryKeyStatusLastUpdated) KeySchema() []*dynamodb.KeySc }, } } + +func onlySummaryFields(queryInput *dynamodb.QueryInput) { + queryInput = queryInput.SetProjectionExpression(summaryProjectionExpression) + for name, exp := range summaryExpressionAttributeNames { + queryInput.ExpressionAttributeNames[name] = exp + } +} diff --git a/store/memory/memory_store.go b/store/memory/memory_store.go index b2d6f48e..cf85fbe3 100644 --- a/store/memory/memory_store.go +++ b/store/memory/memory_store.go @@ -167,7 +167,7 @@ func (s MemoryStore) GetWorkflows( for _, workflow := range s.workflows { if s.matchesQuery(workflow, query) { if query.SummaryOnly { - workflow.Jobs = []*models.Job{} + workflow.Jobs = nil workflow.WorkflowDefinition = &models.WorkflowDefinition{ Name: workflow.WorkflowDefinition.Name, Version: workflow.WorkflowDefinition.Version, diff --git a/store/tests/tests.go b/store/tests/tests.go index fe4a61bf..da313995 100644 --- a/store/tests/tests.go +++ b/store/tests/tests.go @@ -289,7 +289,7 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { Limit: 10, }) require.NoError(t, err) - require.Equal(t, []*models.Job{}, workflows[0].Jobs) + require.Nil(t, workflows[0].Jobs) definitionSummary := &models.WorkflowDefinition{ Name: definition.Name, From a72819e5ea5d543dc33169f158cff6ba3188c743 Mon Sep 17 00:00:00 2001 From: mohit Date: Thu, 12 Oct 2017 21:49:25 +0000 Subject: [PATCH 2/5] generate models.WorkflowQuery; add WorkflowSummary --- executor/log_helpers_test.go | 12 +++- gen-go/models/inputs.go | 10 ++- gen-go/models/workflow.go | 95 +----------------------- gen-go/models/workflow_query.go | 97 +++++++++++++++++++++++++ gen-go/models/workflow_summary.go | 115 ++++++++++++++++++++++++++++++ gen-go/server/handlers.go | 7 +- gen-js/README.md | 54 +++++++------- gen-js/index.js | 4 +- handler.go | 56 +++++++-------- resources/workflows.go | 26 +++---- store/dynamodb/dynamodb_store.go | 9 +-- store/dynamodb/workflow.go | 9 +-- store/memory/memory_store.go | 13 ++-- store/store.go | 12 +--- store/tests/tests.go | 68 +++++++++++------- swagger.yml | 45 ++++++++++-- 16 files changed, 403 insertions(+), 229 deletions(-) create mode 100644 gen-go/models/workflow_query.go create mode 100644 gen-go/models/workflow_summary.go diff --git a/executor/log_helpers_test.go b/executor/log_helpers_test.go index 47d83a33..13e1e6c6 100644 --- a/executor/log_helpers_test.go +++ b/executor/log_helpers_test.go @@ -25,14 +25,22 @@ func TestDataResultsRouting(t *testing.T) { assert.Equal(0, len(counts)) t.Log("matches 'job-status-alerts'") - logJobStatus(&models.Job{}, &models.Workflow{WorkflowDefinition: &models.WorkflowDefinition{}}) + logJobStatus(&models.Job{}, &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + WorkflowDefinition: &models.WorkflowDefinition{}, + }}, + ) counts = mocklog.RuleCounts() assert.Equal(1, len(counts)) assert.Contains(counts, "job-status-alerts") assert.Equal(counts["job-status-alerts"], 1) t.Log("matches 'workflow-status-change'") - logWorkflowStatusChange(&models.Workflow{WorkflowDefinition: &models.WorkflowDefinition{}}, models.WorkflowStatusRunning) + logWorkflowStatusChange(&models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + WorkflowDefinition: &models.WorkflowDefinition{}, + }}, + models.WorkflowStatusRunning) counts = mocklog.RuleCounts() assert.Equal(3, len(counts)) assert.Contains(counts, "workflow-status-metrics") diff --git a/gen-go/models/inputs.go b/gen-go/models/inputs.go index fa3acebd..e0988a98 100644 --- a/gen-go/models/inputs.go +++ b/gen-go/models/inputs.go @@ -286,7 +286,7 @@ func (i GetWorkflowDefinitionByNameAndVersionInput) Path() (string, error) { // GetWorkflowsInput holds the input parameters for a getWorkflows operation. type GetWorkflowsInput struct { - Limit *int32 + Limit *int64 OldestFirst *bool PageToken *string Status *string @@ -298,6 +298,12 @@ type GetWorkflowsInput struct { // requirements from the swagger yml file. func (i GetWorkflowsInput) Validate() error { + if i.Limit != nil { + if err := validate.MaximumInt("limit", "query", *i.Limit, int64(10000), false); err != nil { + return err + } + } + return nil } @@ -307,7 +313,7 @@ func (i GetWorkflowsInput) Path() (string, error) { urlVals := url.Values{} if i.Limit != nil { - urlVals.Add("limit", strconv.FormatInt(int64(*i.Limit), 10)) + urlVals.Add("limit", strconv.FormatInt(*i.Limit, 10)) } if i.OldestFirst != nil { diff --git a/gen-go/models/workflow.go b/gen-go/models/workflow.go index b64d57f4..cffdd25b 100644 --- a/gen-go/models/workflow.go +++ b/gen-go/models/workflow.go @@ -13,68 +13,21 @@ import ( // Workflow workflow // swagger:model Workflow type Workflow struct { - - // created at - CreatedAt strfmt.DateTime `json:"createdAt,omitempty"` - - // id - ID string `json:"id,omitempty"` - - // input - Input string `json:"input,omitempty"` + WorkflowSummary // jobs Jobs []*Job `json:"jobs"` - - // last updated - LastUpdated strfmt.DateTime `json:"lastUpdated,omitempty"` - - // namespace - Namespace string `json:"namespace,omitempty"` - - // queue - Queue string `json:"queue,omitempty"` - - // workflow-id's of workflows created as retries for this workflow - Retries []string `json:"retries"` - - // workflow-id of original workflow in case this is a retry - RetryFor string `json:"retryFor,omitempty"` - - // status - Status WorkflowStatus `json:"status,omitempty"` - - // status reason - StatusReason string `json:"statusReason,omitempty"` - - // tags: object with key-value pairs; keys and values should be strings - Tags map[string]interface{} `json:"tags,omitempty"` - - // workflow definition - WorkflowDefinition *WorkflowDefinition `json:"workflowDefinition,omitempty"` } // Validate validates this workflow func (m *Workflow) Validate(formats strfmt.Registry) error { var res []error - if err := m.validateJobs(formats); err != nil { - // prop - res = append(res, err) - } - - if err := m.validateRetries(formats); err != nil { - // prop + if err := m.WorkflowSummary.Validate(formats); err != nil { res = append(res, err) } - if err := m.validateStatus(formats); err != nil { - // prop - res = append(res, err) - } - - if err := m.validateWorkflowDefinition(formats); err != nil { - // prop + if err := m.validateJobs(formats); err != nil { res = append(res, err) } @@ -86,10 +39,6 @@ func (m *Workflow) Validate(formats strfmt.Registry) error { func (m *Workflow) validateJobs(formats strfmt.Registry) error { - if swag.IsZero(m.Jobs) { // not required - return nil - } - for i := 0; i < len(m.Jobs); i++ { if swag.IsZero(m.Jobs[i]) { // not required @@ -107,41 +56,3 @@ func (m *Workflow) validateJobs(formats strfmt.Registry) error { return nil } - -func (m *Workflow) validateRetries(formats strfmt.Registry) error { - - if swag.IsZero(m.Retries) { // not required - return nil - } - - return nil -} - -func (m *Workflow) validateStatus(formats strfmt.Registry) error { - - if swag.IsZero(m.Status) { // not required - return nil - } - - if err := m.Status.Validate(formats); err != nil { - return err - } - - return nil -} - -func (m *Workflow) validateWorkflowDefinition(formats strfmt.Registry) error { - - if swag.IsZero(m.WorkflowDefinition) { // not required - return nil - } - - if m.WorkflowDefinition != nil { - - if err := m.WorkflowDefinition.Validate(formats); err != nil { - return err - } - } - - return nil -} diff --git a/gen-go/models/workflow_query.go b/gen-go/models/workflow_query.go new file mode 100644 index 00000000..567792ba --- /dev/null +++ b/gen-go/models/workflow_query.go @@ -0,0 +1,97 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +// WorkflowQuery workflow query +// swagger:model WorkflowQuery +type WorkflowQuery struct { + + // limit + // Maximum: 10000 + Limit int64 `json:"limit,omitempty"` + + // oldest first + OldestFirst bool `json:"oldestFirst,omitempty"` + + // page token + PageToken string `json:"pageToken,omitempty"` + + // status + Status WorkflowStatus `json:"status,omitempty"` + + // summary only + SummaryOnly *bool `json:"summaryOnly,omitempty"` + + // workflow definition name + // Required: true + WorkflowDefinitionName *string `json:"workflowDefinitionName"` +} + +// Validate validates this workflow query +func (m *WorkflowQuery) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateLimit(formats); err != nil { + // prop + res = append(res, err) + } + + if err := m.validateStatus(formats); err != nil { + // prop + res = append(res, err) + } + + if err := m.validateWorkflowDefinitionName(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *WorkflowQuery) validateLimit(formats strfmt.Registry) error { + + if swag.IsZero(m.Limit) { // not required + return nil + } + + if err := validate.MaximumInt("limit", "body", int64(m.Limit), 10000, false); err != nil { + return err + } + + return nil +} + +func (m *WorkflowQuery) validateStatus(formats strfmt.Registry) error { + + if swag.IsZero(m.Status) { // not required + return nil + } + + if err := m.Status.Validate(formats); err != nil { + return err + } + + return nil +} + +func (m *WorkflowQuery) validateWorkflowDefinitionName(formats strfmt.Registry) error { + + if err := validate.Required("workflowDefinitionName", "body", m.WorkflowDefinitionName); err != nil { + return err + } + + return nil +} diff --git a/gen-go/models/workflow_summary.go b/gen-go/models/workflow_summary.go new file mode 100644 index 00000000..19f9ad35 --- /dev/null +++ b/gen-go/models/workflow_summary.go @@ -0,0 +1,115 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + + "github.com/go-openapi/errors" +) + +// WorkflowSummary workflow summary +// swagger:model WorkflowSummary +type WorkflowSummary struct { + + // created at + CreatedAt strfmt.DateTime `json:"createdAt,omitempty"` + + // id + ID string `json:"id,omitempty"` + + // input + Input string `json:"input,omitempty"` + + // last updated + LastUpdated strfmt.DateTime `json:"lastUpdated,omitempty"` + + // namespace + Namespace string `json:"namespace,omitempty"` + + // queue + Queue string `json:"queue,omitempty"` + + // workflow-id's of workflows created as retries for this workflow + Retries []string `json:"retries"` + + // workflow-id of original workflow in case this is a retry + RetryFor string `json:"retryFor,omitempty"` + + // status + Status WorkflowStatus `json:"status,omitempty"` + + // status reason + StatusReason string `json:"statusReason,omitempty"` + + // tags: object with key-value pairs; keys and values should be strings + Tags map[string]interface{} `json:"tags,omitempty"` + + // workflow definition + WorkflowDefinition *WorkflowDefinition `json:"workflowDefinition,omitempty"` +} + +// Validate validates this workflow summary +func (m *WorkflowSummary) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateRetries(formats); err != nil { + // prop + res = append(res, err) + } + + if err := m.validateStatus(formats); err != nil { + // prop + res = append(res, err) + } + + if err := m.validateWorkflowDefinition(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *WorkflowSummary) validateRetries(formats strfmt.Registry) error { + + if swag.IsZero(m.Retries) { // not required + return nil + } + + return nil +} + +func (m *WorkflowSummary) validateStatus(formats strfmt.Registry) error { + + if swag.IsZero(m.Status) { // not required + return nil + } + + if err := m.Status.Validate(formats); err != nil { + return err + } + + return nil +} + +func (m *WorkflowSummary) validateWorkflowDefinition(formats strfmt.Registry) error { + + if swag.IsZero(m.WorkflowDefinition) { // not required + return nil + } + + if m.WorkflowDefinition != nil { + + if err := m.WorkflowDefinition.Validate(formats); err != nil { + return err + } + } + + return nil +} diff --git a/gen-go/server/handlers.go b/gen-go/server/handlers.go index fd318b93..cce423ad 100644 --- a/gen-go/server/handlers.go +++ b/gen-go/server/handlers.go @@ -1216,10 +1216,13 @@ func newGetWorkflowsInput(r *http.Request) (*models.GetWorkflowsInput, error) { limitStrs := r.URL.Query()["limit"] + if len(limitStrs) == 0 { + limitStrs = []string{"10"} + } if len(limitStrs) > 0 { - var limitTmp int32 + var limitTmp int64 limitStr := limitStrs[0] - limitTmp, err = swag.ConvertInt32(limitStr) + limitTmp, err = swag.ConvertInt64(limitStr) if err != nil { return nil, err } diff --git a/gen-js/README.md b/gen-js/README.md index c50ef190..79766303 100644 --- a/gen-js/README.md +++ b/gen-js/README.md @@ -290,20 +290,20 @@ Get the latest versions of all available WorkflowDefinitions **Reject**: [InternalError](#module_workflow-manager--WorkflowManager.Errors.InternalError) **Reject**: Error -| Param | Type | Description | -| --- | --- | --- | -| params | Object | | -| [params.limit] | number | Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. | -| [params.oldestFirst] | boolean | | -| [params.pageToken] | string | | -| [params.status] | string | | -| [params.summaryOnly] | boolean | Limits workflow data to the bare minimum - omits the full workflow definition and job data. | -| params.workflowDefinitionName | string | | -| [options] | object | | -| [options.timeout] | number | A request specific timeout | -| [options.span] | [Span](https://doc.esdoc.org/github.com/opentracing/opentracing-javascript/class/src/span.js~Span.html) | An OpenTracing span - For example from the parent request | -| [options.retryPolicy] | [RetryPolicies](#module_workflow-manager--WorkflowManager.RetryPolicies) | A request specific retryPolicy | -| [cb] | function | | +| Param | Type | Default | Description | +| --- | --- | --- | --- | +| params | Object | | | +| [params.limit] | number | 10 | Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. | +| [params.oldestFirst] | boolean | | | +| [params.pageToken] | string | | | +| [params.status] | string | | | +| [params.summaryOnly] | boolean | | Limits workflow data to the bare minimum - omits the full workflow definition and job data. | +| params.workflowDefinitionName | string | | | +| [options] | object | | | +| [options.timeout] | number | | A request specific timeout | +| [options.span] | [Span](https://doc.esdoc.org/github.com/opentracing/opentracing-javascript/class/src/span.js~Span.html) | | An OpenTracing span - For example from the parent request | +| [options.retryPolicy] | [RetryPolicies](#module_workflow-manager--WorkflowManager.RetryPolicies) | | A request specific retryPolicy | +| [cb] | function | | | @@ -311,19 +311,19 @@ Get the latest versions of all available WorkflowDefinitions **Kind**: instance method of [WorkflowManager](#exp_module_workflow-manager--WorkflowManager) **Returns**: Object - iterfunction - iter.map - takes in a function, applies it to each resource, and returns a promise to the result as an arrayfunction - iter.toArray - returns a promise to the resources as an arrayfunction - iter.forEach - takes in a function, applies it to each resource -| Param | Type | Description | -| --- | --- | --- | -| params | Object | | -| [params.limit] | number | Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. | -| [params.oldestFirst] | boolean | | -| [params.pageToken] | string | | -| [params.status] | string | | -| [params.summaryOnly] | boolean | Limits workflow data to the bare minimum - omits the full workflow definition and job data. | -| params.workflowDefinitionName | string | | -| [options] | object | | -| [options.timeout] | number | A request specific timeout | -| [options.span] | [Span](https://doc.esdoc.org/github.com/opentracing/opentracing-javascript/class/src/span.js~Span.html) | An OpenTracing span - For example from the parent request | -| [options.retryPolicy] | [RetryPolicies](#module_workflow-manager--WorkflowManager.RetryPolicies) | A request specific retryPolicy | +| Param | Type | Default | Description | +| --- | --- | --- | --- | +| params | Object | | | +| [params.limit] | number | 10 | Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. | +| [params.oldestFirst] | boolean | | | +| [params.pageToken] | string | | | +| [params.status] | string | | | +| [params.summaryOnly] | boolean | | Limits workflow data to the bare minimum - omits the full workflow definition and job data. | +| params.workflowDefinitionName | string | | | +| [options] | object | | | +| [options.timeout] | number | | A request specific timeout | +| [options.span] | [Span](https://doc.esdoc.org/github.com/opentracing/opentracing-javascript/class/src/span.js~Span.html) | | An OpenTracing span - For example from the parent request | +| [options.retryPolicy] | [RetryPolicies](#module_workflow-manager--WorkflowManager.RetryPolicies) | | A request specific retryPolicy | diff --git a/gen-js/index.js b/gen-js/index.js index 6c6a1faa..ab021793 100644 --- a/gen-js/index.js +++ b/gen-js/index.js @@ -1461,7 +1461,7 @@ class WorkflowManager { /** * @param {Object} params - * @param {number} [params.limit] - Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. + * @param {number} [params.limit=10] - Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. * @param {boolean} [params.oldestFirst] * @param {string} [params.pageToken] * @param {string} [params.status] @@ -1610,7 +1610,7 @@ class WorkflowManager { /** * @param {Object} params - * @param {number} [params.limit] - Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. + * @param {number} [params.limit=10] - Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. * @param {boolean} [params.oldestFirst] * @param {string} [params.pageToken] * @param {string} [params.status] diff --git a/handler.go b/handler.go index d398fca0..f217886f 100644 --- a/handler.go +++ b/handler.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/go-openapi/swag" + "github.com/aws/aws-sdk-go/aws" "github.com/Clever/workflow-manager/executor" "github.com/Clever/workflow-manager/gen-go/models" @@ -12,15 +12,6 @@ import ( "github.com/Clever/workflow-manager/store" ) -const ( - // WorkflowsPageSizeMax defines the default page size for workflow queries. - // TODO: This can be bumped up a bit once ark is updated to use the `limit` query param. - WorkflowsPageSizeDefault int = 10 - - // WorkflowsPageSizeMax defines the maximum allowed page size limit for workflow queries. - WorkflowsPageSizeMax int = 10000 -) - // Handler implements the wag Controller type Handler struct { store store.Store @@ -182,25 +173,16 @@ func (h Handler) GetWorkflows( ctx context.Context, input *models.GetWorkflowsInput, ) ([]models.Workflow, string, error) { - limit := WorkflowsPageSizeDefault - if input.Limit != nil && *input.Limit > 0 { - limit = int(*input.Limit) - } - if limit > WorkflowsPageSizeMax { - limit = WorkflowsPageSizeMax - } - - workflows, nextPageToken, err := h.store.GetWorkflows(&store.WorkflowQuery{ - DefinitionName: input.WorkflowDefinitionName, - Limit: limit, - OldestFirst: swag.BoolValue(input.OldestFirst), - PageToken: swag.StringValue(input.PageToken), - Status: swag.StringValue(input.Status), - SummaryOnly: swag.BoolValue(input.SummaryOnly), - }) + + workflowQuery, err := paramsToWorkflowsQuery(input) + if err != nil { + return []models.Workflow{}, "", err + } + + workflows, nextPageToken, err := h.store.GetWorkflows(workflowQuery) if err != nil { if _, ok := err.(store.InvalidPageTokenError); ok { - return []models.Workflow{}, "", models.BadRequest{ + return workflows, "", models.BadRequest{ Message: err.Error(), } } @@ -208,11 +190,23 @@ func (h Handler) GetWorkflows( return []models.Workflow{}, "", err } - results := []models.Workflow{} - for _, workflow := range workflows { - results = append(results, workflow) + return workflows, nextPageToken, nil +} + +func paramsToWorkflowsQuery(input *models.GetWorkflowsInput) (*models.WorkflowQuery, error) { + query := &models.WorkflowQuery{ + WorkflowDefinitionName: aws.String(input.WorkflowDefinitionName), + Limit: aws.Int64Value(input.Limit), + OldestFirst: aws.BoolValue(input.OldestFirst), + PageToken: aws.StringValue(input.PageToken), + Status: models.WorkflowStatus(aws.StringValue(input.Status)), + SummaryOnly: input.SummaryOnly, + } + + if err := query.Validate(nil); err != nil { + return nil, err } - return results, nextPageToken, nil + return query, nil } // GetWorkflowByID returns current details about a Workflow with the given workflowId diff --git a/resources/workflows.go b/resources/workflows.go index 974c88fe..97f3b666 100644 --- a/resources/workflows.go +++ b/resources/workflows.go @@ -11,18 +11,20 @@ import ( // NewWorkflow creates a new Workflow struct for a WorkflowDefinition func NewWorkflow(wfd *models.WorkflowDefinition, input string, namespace string, queue string, tags map[string]interface{}) *models.Workflow { return &models.Workflow{ - ID: uuid.NewV4().String(), - CreatedAt: strfmt.DateTime(time.Now()), - LastUpdated: strfmt.DateTime(time.Now()), - WorkflowDefinition: wfd, - Status: models.WorkflowStatusQueued, - Jobs: []*models.Job{}, - Namespace: namespace, - Queue: queue, - Input: input, - Tags: tags, - Retries: []string{}, - RetryFor: "", + WorkflowSummary: models.WorkflowSummary{ + ID: uuid.NewV4().String(), + CreatedAt: strfmt.DateTime(time.Now()), + LastUpdated: strfmt.DateTime(time.Now()), + WorkflowDefinition: wfd, + Status: models.WorkflowStatusQueued, + Namespace: namespace, + Queue: queue, + Input: input, + Tags: tags, + Retries: []string{}, + RetryFor: "", + }, + Jobs: []*models.Job{}, } } diff --git a/store/dynamodb/dynamodb_store.go b/store/dynamodb/dynamodb_store.go index 8aff6e40..0923f200 100644 --- a/store/dynamodb/dynamodb_store.go +++ b/store/dynamodb/dynamodb_store.go @@ -545,9 +545,10 @@ func (d DynamoDB) GetWorkflowByID(id string) (models.Workflow, error) { } // GetWorkflows returns all workflows matching the given query. -func (d DynamoDB) GetWorkflows(query *store.WorkflowQuery) ([]models.Workflow, string, error) { +func (d DynamoDB) GetWorkflows(query *models.WorkflowQuery) ([]models.Workflow, string, error) { var workflows []models.Workflow nextPageToken := "" + summaryOnly := aws.BoolValue(query.SummaryOnly) var dbQuery *dynamodb.QueryInput var err error @@ -555,15 +556,15 @@ func (d DynamoDB) GetWorkflows(query *store.WorkflowQuery) ([]models.Workflow, s dbQuery, err = ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt{}.ConstructQuery(query) } else { dbQuery, err = ddbWorkflowSecondaryKeyWorkflowDefinitionCreatedAt{ - WorkflowDefinitionName: query.DefinitionName, - }.ConstructQuery(query.SummaryOnly) + WorkflowDefinitionName: aws.StringValue(query.WorkflowDefinitionName), + }.ConstructQuery(summaryOnly) } if err != nil { return workflows, nextPageToken, err } dbQuery.TableName = aws.String(d.workflowsTable()) - dbQuery.Limit = aws.Int64(int64(query.Limit)) + dbQuery.Limit = aws.Int64(query.Limit) dbQuery.ScanIndexForward = aws.Bool(query.OldestFirst) pageKey, err := ParsePageKey(query.PageToken) diff --git a/store/dynamodb/workflow.go b/store/dynamodb/workflow.go index 1a42ddb2..9322d9d2 100644 --- a/store/dynamodb/workflow.go +++ b/store/dynamodb/workflow.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/Clever/workflow-manager/gen-go/models" - "github.com/Clever/workflow-manager/store" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" @@ -195,7 +194,7 @@ func (sk ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt) getDefinitionStatusPa } func (sk ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt) ConstructQuery( - query *store.WorkflowQuery, + query *models.WorkflowQuery, ) (*dynamodb.QueryInput, error) { if query.Status == "" { return nil, fmt.Errorf("workflow status filter is required for %s index", sk.Name()) @@ -209,14 +208,16 @@ func (sk ddbWorkflowSecondaryKeyDefinitionStatusCreatedAt) ConstructQuery( ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ ":workflowNameAndStatus": &dynamodb.AttributeValue{ S: aws.String( - sk.getDefinitionStatusPair(query.DefinitionName, query.Status), + sk.getDefinitionStatusPair( + aws.StringValue(query.WorkflowDefinitionName), + string(query.Status)), ), }, }, KeyConditionExpression: aws.String("#WS = :workflowNameAndStatus"), } - if query.SummaryOnly { + if aws.BoolValue(query.SummaryOnly) { onlySummaryFields(queryInput) } diff --git a/store/memory/memory_store.go b/store/memory/memory_store.go index cf85fbe3..3f7c5007 100644 --- a/store/memory/memory_store.go +++ b/store/memory/memory_store.go @@ -5,6 +5,7 @@ import ( "sort" "time" + "github.com/aws/aws-sdk-go/aws" "github.com/go-openapi/strfmt" "github.com/satori/go.uuid" @@ -161,12 +162,12 @@ func (s MemoryStore) UpdateWorkflow(workflow models.Workflow) error { } func (s MemoryStore) GetWorkflows( - query *store.WorkflowQuery, + query *models.WorkflowQuery, ) ([]models.Workflow, string, error) { workflows := []models.Workflow{} for _, workflow := range s.workflows { if s.matchesQuery(workflow, query) { - if query.SummaryOnly { + if aws.BoolValue(query.SummaryOnly) { workflow.Jobs = nil workflow.WorkflowDefinition = &models.WorkflowDefinition{ Name: workflow.WorkflowDefinition.Name, @@ -198,7 +199,7 @@ func (s MemoryStore) GetWorkflows( } } - rangeEnd := rangeStart + query.Limit + rangeEnd := rangeStart + int(query.Limit) if rangeEnd > len(workflows) { rangeEnd = len(workflows) } @@ -210,12 +211,12 @@ func (s MemoryStore) GetWorkflows( return workflows[rangeStart:rangeEnd], nextPageToken, nil } -func (s MemoryStore) matchesQuery(workflow models.Workflow, query *store.WorkflowQuery) bool { - if workflow.WorkflowDefinition.Name != query.DefinitionName { +func (s MemoryStore) matchesQuery(workflow models.Workflow, query *models.WorkflowQuery) bool { + if workflow.WorkflowDefinition.Name != aws.StringValue(query.WorkflowDefinitionName) { return false } - if query.Status != "" && string(workflow.Status) != query.Status { + if query.Status != "" && workflow.Status != query.Status { return false } diff --git a/store/store.go b/store/store.go index 759db5e1..0fb33044 100644 --- a/store/store.go +++ b/store/store.go @@ -23,22 +23,12 @@ type Store interface { SaveWorkflow(workflow models.Workflow) error UpdateWorkflow(workflow models.Workflow) error GetWorkflowByID(id string) (models.Workflow, error) - GetWorkflows(query *WorkflowQuery) ([]models.Workflow, string, error) + GetWorkflows(query *models.WorkflowQuery) ([]models.Workflow, string, error) GetPendingWorkflowIDs() ([]string, error) LockWorkflow(id string) error UnlockWorkflow(id string) error } -// WorkflowQuery contains filtering options for workflow queries. -type WorkflowQuery struct { - DefinitionName string - Limit int - OldestFirst bool - PageToken string - Status string - SummaryOnly bool -} - // ErrWorkflowLocked is returned from LockWorfklow in the case of the workflow already being locked. var ErrWorkflowLocked = errors.New("workflow already locked") diff --git a/store/tests/tests.go b/store/tests/tests.go index da313995..de8d8e13 100644 --- a/store/tests/tests.go +++ b/store/tests/tests.go @@ -1,9 +1,12 @@ package tests import ( + "fmt" + "reflect" "testing" "time" + "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" "github.com/Clever/workflow-manager/gen-go/models" @@ -203,13 +206,15 @@ func GetWorkflowByID(s store.Store, t *testing.T) func(t *testing.T) { require.Nil(t, err) expected := models.Workflow{ - ID: workflow.ID, - WorkflowDefinition: wf, - Input: `["input"]`, - Status: models.WorkflowStatusQueued, - Namespace: "namespace", - Queue: "queue", - Tags: tags, + WorkflowSummary: models.WorkflowSummary{ + ID: workflow.ID, + WorkflowDefinition: wf, + Input: `["input"]`, + Status: models.WorkflowStatusQueued, + Namespace: "namespace", + Queue: "queue", + Tags: tags, + }, } require.Equal(t, savedWorkflow.Input, expected.Input) require.Equal(t, savedWorkflow.Status, expected.Status) @@ -248,9 +253,9 @@ func GetWorkflows(s store.Store, t *testing.T) func(t *testing.T) { require.NoError(t, s.SaveWorkflow(*otherDefinitionWorkflow)) // Verify results for query with no status filtering: - workflows, _, err := s.GetWorkflows(&store.WorkflowQuery{ - DefinitionName: definition.Name, - Limit: 10, + workflows, _, err := s.GetWorkflows(&models.WorkflowQuery{ + WorkflowDefinitionName: aws.String(definition.Name), + Limit: 10, }) require.NoError(t, err) require.Len(t, workflows, 2) @@ -258,10 +263,10 @@ func GetWorkflows(s store.Store, t *testing.T) func(t *testing.T) { require.Equal(t, runningWorkflow.ID, workflows[1].ID) // Verify results for query with status filtering: - workflows, _, err = s.GetWorkflows(&store.WorkflowQuery{ - DefinitionName: definition.Name, - Status: string(models.WorkflowStatusRunning), - Limit: 10, + workflows, _, err = s.GetWorkflows(&models.WorkflowQuery{ + WorkflowDefinitionName: aws.String(definition.Name), + Status: models.WorkflowStatusRunning, + Limit: 10, }) require.NoError(t, err) require.Len(t, workflows, 1) @@ -283,10 +288,10 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { require.NoError(t, s.SaveWorkflow(*workflow)) // Verify details are excluded if SummaryOnly == true: - workflows, _, err := s.GetWorkflows(&store.WorkflowQuery{ - DefinitionName: definition.Name, - SummaryOnly: true, - Limit: 10, + workflows, _, err := s.GetWorkflows(&models.WorkflowQuery{ + WorkflowDefinitionName: aws.String(definition.Name), + SummaryOnly: aws.Bool(true), + Limit: 10, }) require.NoError(t, err) require.Nil(t, workflows[0].Jobs) @@ -297,11 +302,20 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { } require.Equal(t, definitionSummary, workflows[0].WorkflowDefinition) + // All fields of WorkflowSummary should be present + wsType := reflect.TypeOf(models.WorkflowSummary{}) + workflowVal := reflect.ValueOf(workflows[0]) + for i := 0; i < wsType.NumField(); i++ { + name := wsType.Field(i).Name + fmt.Println(i, workflowVal.FieldByName(name).String()) + assert.NotNil(t, workflowVal.FieldByName(name).String()) + } + // Verify details are included if SummaryOnly == false: - workflows, _, err = s.GetWorkflows(&store.WorkflowQuery{ - DefinitionName: definition.Name, - SummaryOnly: false, - Limit: 10, + workflows, _, err = s.GetWorkflows(&models.WorkflowQuery{ + WorkflowDefinitionName: aws.String(definition.Name), + SummaryOnly: aws.Bool(false), + Limit: 10, }) require.NoError(t, err) require.Equal(t, workflow.Jobs, workflows[0].Jobs) @@ -333,7 +347,7 @@ func GetWorkflowsPagination(s store.Store, t *testing.T) func(t *testing.T) { require.NoError(t, s.SaveWorkflow(*workflow3)) limit := 1 - getAllPages := func(query store.WorkflowQuery) []models.Workflow { + getAllPages := func(query models.WorkflowQuery) []models.Workflow { nextPageToken := "" nextQuery := query workflows := []models.Workflow{} @@ -361,10 +375,10 @@ func GetWorkflowsPagination(s store.Store, t *testing.T) func(t *testing.T) { return workflows } - query := store.WorkflowQuery{ - DefinitionName: definition.Name, - Limit: limit, - Status: string(models.WorkflowStatusRunning), + query := models.WorkflowQuery{ + WorkflowDefinitionName: aws.String(definition.Name), + Limit: int64(limit), + Status: models.WorkflowStatusRunning, } workflows := getAllPages(query) diff --git a/swagger.yml b/swagger.yml index f9f81c31..de2f3a0d 100644 --- a/swagger.yml +++ b/swagger.yml @@ -149,14 +149,16 @@ paths: pageParameter: pageToken parameters: # TODO: Add date range filters. + # should be kept in sync with WorkflowQuery model - name: limit + default: 10 # TODO: this can be increased after ark support + maximum: 10000 + in: query + type: integer description: Maximum number of workflows to return. Defaults to 10. Restricted to a max of 10,000. - in: query - type: integer - format: int32 - name: oldestFirst in: query type: boolean @@ -371,6 +373,16 @@ definitions: - "step-functions" Workflow: + allOf: + - $ref: '#/definitions/WorkflowSummary' + - type: object + properties: + jobs: + type: array + items: + $ref: '#/definitions/Job' + + WorkflowSummary: type: object properties: id: @@ -387,10 +399,6 @@ definitions: $ref: '#/definitions/WorkflowStatus' statusReason: type: string - jobs: - type: array - items: - $ref: '#/definitions/Job' namespace: type: string queue: @@ -411,6 +419,7 @@ definitions: additionalProperties: type: object + WorkflowStatus: type: string enum: @@ -634,3 +643,25 @@ definitions: properties: StartAt: type: string + + # Should be kept in sync with getWorkflows API + WorkflowQuery: + type: object + required: + - workflowDefinitionName + properties: + workflowDefinitionName: + type: string + limit: + type: integer + default: 10 + maximum: 10000 + oldestFirst: + type: boolean + pageToken: + type: string + status: + $ref: '#/definitions/WorkflowStatus' + summaryOnly: + type: boolean + default: false From 59fa5f1b4e12998c50b2761cf300434cd9e2ded8 Mon Sep 17 00:00:00 2001 From: mohit Date: Fri, 13 Oct 2017 02:03:37 +0000 Subject: [PATCH 3/5] add comment --- store/dynamodb/workflow.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/dynamodb/workflow.go b/store/dynamodb/workflow.go index 9322d9d2..a08efd05 100644 --- a/store/dynamodb/workflow.go +++ b/store/dynamodb/workflow.go @@ -13,6 +13,7 @@ import ( // SummaryKeys are json paths to the Workflow fields we want to pull out of dynamodb // when summaryOnly=true in WorkflowQuery +// This should be kept in sync with the WorkflowSummary model defined in swagger var SummaryKeys = []string{ "Workflow.createdAt", "Workflow.id", From 0cf8a717b6a78054bffd1fc5ff63fa345df95f45 Mon Sep 17 00:00:00 2001 From: mohit Date: Fri, 13 Oct 2017 04:53:32 +0000 Subject: [PATCH 4/5] move StatusReason out of WorkflowSummary also simplify memory GetWorkflows removal of fields for summaryOnly=true --- gen-go/models/workflow.go | 3 +++ gen-go/models/workflow_summary.go | 3 --- store/dynamodb/workflow.go | 1 - store/memory/memory_store.go | 6 +++++- store/tests/tests.go | 10 +++++++--- swagger.yml | 4 ++-- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/gen-go/models/workflow.go b/gen-go/models/workflow.go index cffdd25b..05776947 100644 --- a/gen-go/models/workflow.go +++ b/gen-go/models/workflow.go @@ -17,6 +17,9 @@ type Workflow struct { // jobs Jobs []*Job `json:"jobs"` + + // status reason + StatusReason string `json:"statusReason,omitempty"` } // Validate validates this workflow diff --git a/gen-go/models/workflow_summary.go b/gen-go/models/workflow_summary.go index 19f9ad35..89d38bb4 100644 --- a/gen-go/models/workflow_summary.go +++ b/gen-go/models/workflow_summary.go @@ -41,9 +41,6 @@ type WorkflowSummary struct { // status Status WorkflowStatus `json:"status,omitempty"` - // status reason - StatusReason string `json:"statusReason,omitempty"` - // tags: object with key-value pairs; keys and values should be strings Tags map[string]interface{} `json:"tags,omitempty"` diff --git a/store/dynamodb/workflow.go b/store/dynamodb/workflow.go index a08efd05..15782634 100644 --- a/store/dynamodb/workflow.go +++ b/store/dynamodb/workflow.go @@ -24,7 +24,6 @@ var SummaryKeys = []string{ "Workflow.retries", "Workflow.retryFor", "Workflow.#S", // status - "Workflow.statusReason", "Workflow.tags", "Workflow.workflowDefinition.#N", diff --git a/store/memory/memory_store.go b/store/memory/memory_store.go index 3f7c5007..cc1aa9d0 100644 --- a/store/memory/memory_store.go +++ b/store/memory/memory_store.go @@ -168,7 +168,11 @@ func (s MemoryStore) GetWorkflows( for _, workflow := range s.workflows { if s.matchesQuery(workflow, query) { if aws.BoolValue(query.SummaryOnly) { - workflow.Jobs = nil + // remove everything but WorkflowSummary + workflow = models.Workflow{ + WorkflowSummary: workflow.WorkflowSummary, + } + // we need to minimize the WorkflowDefinition workflow.WorkflowDefinition = &models.WorkflowDefinition{ Name: workflow.WorkflowDefinition.Name, Version: workflow.WorkflowDefinition.Version, diff --git a/store/tests/tests.go b/store/tests/tests.go index de8d8e13..5dec0663 100644 --- a/store/tests/tests.go +++ b/store/tests/tests.go @@ -1,7 +1,6 @@ package tests import ( - "fmt" "reflect" "testing" "time" @@ -285,6 +284,9 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { ID: "job1", Input: `["job input"]`, }} + workflow.Retries = []string{"x"} + workflow.RetryFor = "y" + workflow.StatusReason = "test reason" require.NoError(t, s.SaveWorkflow(*workflow)) // Verify details are excluded if SummaryOnly == true: @@ -295,6 +297,7 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { }) require.NoError(t, err) require.Nil(t, workflows[0].Jobs) + require.Empty(t, workflows[0].StatusReason) definitionSummary := &models.WorkflowDefinition{ Name: definition.Name, @@ -307,8 +310,8 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { workflowVal := reflect.ValueOf(workflows[0]) for i := 0; i < wsType.NumField(); i++ { name := wsType.Field(i).Name - fmt.Println(i, workflowVal.FieldByName(name).String()) - assert.NotNil(t, workflowVal.FieldByName(name).String()) + assert.NotNil(t, workflowVal.FieldByName(name).String(), "Field Name: %s", name) + assert.NotEmpty(t, workflowVal.FieldByName(name).String(), "Field Name: %s", name) } // Verify details are included if SummaryOnly == false: @@ -321,6 +324,7 @@ func GetWorkflowsSummaryOnly(s store.Store, t *testing.T) func(t *testing.T) { require.Equal(t, workflow.Jobs, workflows[0].Jobs) require.Equal(t, definition.Name, workflows[0].WorkflowDefinition.Name) require.Equal(t, definition.Version, workflows[0].WorkflowDefinition.Version) + require.Equal(t, workflow.StatusReason, workflows[0].StatusReason) require.Equal( t, len(definition.StateMachine.States), diff --git a/swagger.yml b/swagger.yml index de2f3a0d..1fbc3a2e 100644 --- a/swagger.yml +++ b/swagger.yml @@ -377,6 +377,8 @@ definitions: - $ref: '#/definitions/WorkflowSummary' - type: object properties: + statusReason: + type: string jobs: type: array items: @@ -397,8 +399,6 @@ definitions: $ref: '#/definitions/WorkflowDefinition' status: $ref: '#/definitions/WorkflowStatus' - statusReason: - type: string namespace: type: string queue: From 53e83d2ef59389eecd9db2fdaa49131e90b151ac Mon Sep 17 00:00:00 2001 From: mohit Date: Fri, 13 Oct 2017 04:55:11 +0000 Subject: [PATCH 5/5] patch bump since we will have new clients/models --- swagger.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swagger.yml b/swagger.yml index 1fbc3a2e..2bea1606 100644 --- a/swagger.yml +++ b/swagger.yml @@ -5,7 +5,7 @@ info: # when changing the version here, make sure to # 1. re-run `make generate` to generate clients and server # 2. merge the new clients - version: 0.6.3 + version: 0.6.4 x-npm-package: workflow-manager schemes: - http