Skip to content

Commit

Permalink
Merge pull request #41 from Clever/INFRA-2430-add-namespace-and-queue…
Browse files Browse the repository at this point in the history
…-to-workflow-model

[INFRA-2430] add input, namespace, and queue to workflow model
  • Loading branch information
bgveenstra authored Sep 22, 2017
2 parents 60f167e + 02148c2 commit 2ddbe5b
Show file tree
Hide file tree
Showing 23 changed files with 324 additions and 205 deletions.
5 changes: 4 additions & 1 deletion executor/batchclient/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/go-openapi/strfmt"
)

// DefaultQueue contains the swagger api's default value for queue input
var DefaultQueue = "default"

// DependenciesEnvVarName is injected for every task
// with a list of dependency ids
const DependenciesEnvVarName = "_BATCH_DEPENDENCIES"
Expand Down Expand Up @@ -258,7 +261,7 @@ func (be BatchExecutor) SubmitWorkflow(name string, definition string, dependenc
func (be BatchExecutor) getJobQueue(queue string) (string, error) {
var ok bool
jobQueue := be.defaultQueue
if queue != "" {
if queue != DefaultQueue {
// use a custom queue
jobQueue, ok = be.customQueues[queue]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion executor/batchclient/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestSubmitWorkflowToCustomQueue(t *testing.T) {
mockClient.EXPECT().SubmitJob(gomock.Any()).Return(&batch.SubmitJobOutput{
JobId: aws.String("job-id"),
}, nil)
out, err := be.SubmitWorkflow(name, definition, dependencies, input, "", 0)
out, err := be.SubmitWorkflow(name, definition, dependencies, input, DefaultQueue, 0)
assert.NoError(t, err)
assert.Equal(t, "job-id", out)

Expand Down
2 changes: 1 addition & 1 deletion executor/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (wm BatchWorkflowManager) UpdateWorkflowStatus(workflow *resources.Workflow

// CreateWorkflow can be used to create a new workflow for a given WorkflowDefinition
func (wm BatchWorkflowManager) CreateWorkflow(def resources.WorkflowDefinition, input []string, namespace string, queue string) (*resources.Workflow, error) {
workflow := resources.NewWorkflow(def, input) // TODO: add namespace to Workflow struct
workflow := resources.NewWorkflow(def, input, namespace, queue)
logWorkflowStatusChange(workflow, "")

stateResources, err := wm.getStateResources(workflow, namespace)
Expand Down
11 changes: 7 additions & 4 deletions executor/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func TestCancelUpdates(t *testing.T) {

// TestCreateWorkflow tests that jobs are created for a workflow in the right order
// with the appropriate settings
// @todo - panic "concurrent map writes"
func TestCreateWorkflow(t *testing.T) {
mockClient := &mockBatchClient{
map[string]*resources.Job{},
Expand Down Expand Up @@ -276,27 +277,29 @@ func TestGetStateResources(t *testing.T) {
}
wf := resources.KitchenSinkWorkflowDefinition(t)
input := []string{"test-start-input", "arg2"}
namespace := "my-env"
queue := "queue"

t.Log("Works without providing a namespace for CreateWorkflow")
stateResources, err := jm.getStateResources(resources.NewWorkflow(wf, input), "")
stateResources, err := jm.getStateResources(resources.NewWorkflow(wf, input, namespace, queue), "")
assert.Nil(t, err)
for k, stateResource := range stateResources {
assert.Equal(t, wf.StatesMap[k].Resource(), stateResource.URI)
}

t.Log("Fails when using a namespace for CreateWorkflow without StateResource")
stateResources, err = jm.getStateResources(resources.NewWorkflow(wf, input), "does-not-exist")
stateResources, err = jm.getStateResources(resources.NewWorkflow(wf, input, namespace, queue), "does-not-exist")
assert.Error(t, err, fmt.Sprintf("StateResource `%s:%s` Not Found: %s",
"does-not-exist", "fake-resource-1", "does-not-exist--fake-resource-1"))

t.Log("Works when using a namespace for CreateWorkflow")
for _, i := range []int{1, 2, 3} {
store.SaveStateResource(resources.NewBatchResource(
fmt.Sprintf("fake-resource-%d", i),
"my-env",
namespace,
fmt.Sprintf("arn:batch:jobdefinition:%d", i)))
}
stateResources, err = jm.getStateResources(resources.NewWorkflow(wf, input), "my-env")
stateResources, err = jm.getStateResources(resources.NewWorkflow(wf, input, namespace, queue), "my-env")
assert.Nil(t, err)
assert.Equal(t, stateResources["start-state"].Name, "fake-resource-1")
assert.Equal(t, stateResources["start-state"].Namespace, "my-env")
Expand Down
2 changes: 1 addition & 1 deletion gen-go/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ func (c *WagClient) doGetWorkflowsRequest(ctx context.Context, req *http.Request
// 404: *models.NotFound
// 500: *models.InternalError
// default: client side HTTP errors, for example: context.DeadlineExceeded.
func (c *WagClient) StartWorkflow(ctx context.Context, i *models.WorkflowInput) (*models.Workflow, error) {
func (c *WagClient) StartWorkflow(ctx context.Context, i *models.StartWorkflowRequest) (*models.Workflow, error) {
headers := make(map[string]string)

var body []byte
Expand Down
2 changes: 1 addition & 1 deletion gen-go/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Client interface {
// 404: *models.NotFound
// 500: *models.InternalError
// default: client side HTTP errors, for example: context.DeadlineExceeded.
StartWorkflow(ctx context.Context, i *models.WorkflowInput) (*models.Workflow, error)
StartWorkflow(ctx context.Context, i *models.StartWorkflowRequest) (*models.Workflow, error)

// CancelWorkflow makes a DELETE request to /workflows/{workflowId}
//
Expand Down
160 changes: 96 additions & 64 deletions gen-go/client/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2ddbe5b

Please sign in to comment.