From a5ce81648c344fd352c01e61363441c4d08b9ec3 Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Sat, 31 Aug 2019 20:06:50 -0700 Subject: [PATCH 1/5] Initial tracing for SFN and SQS api calls. --- handler.go | 2 +- main.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/handler.go b/handler.go index 4481a641..511c3693 100644 --- a/handler.go +++ b/handler.go @@ -221,7 +221,7 @@ func (h Handler) GetWorkflows( indexName = "clever-dev-workflow-manager-dev-v3-workflows" } req := []func(*esapi.SearchRequest){ - h.es.Search.WithContext(context.Background()), + h.es.Search.WithContext(ctx), h.es.Search.WithIndex(indexName), h.es.Search.WithFrom(0), h.es.Search.WithSourceIncludes( diff --git a/main.go b/main.go index 7db12474..85de1f1f 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,11 @@ import ( "github.com/aws/aws-sdk-go/service/sfn" "github.com/aws/aws-sdk-go/service/sqs" elasticsearch "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" + "github.com/elastic/go-elasticsearch/v6/estransport" "github.com/kardianos/osext" + otaws "github.com/opentracing-contrib/go-aws-sdk" + "github.com/opentracing/opentracing-go" counter "github.com/Clever/aws-sdk-go-counter" "github.com/Clever/workflow-manager/executor" @@ -91,12 +95,19 @@ func main() { } sqsapi := sqs.New(session.New(), aws.NewConfig().WithRegion(c.SQSRegion)) + + // Add OpenTracing Handlers + // Note that Dynamo has automatically OpenTracing through wag's dynamo code + otaws.AddOTHandlers(countedSFNAPI.Client) + otaws.AddOTHandlers(sqsapi.Client) + wfmSFN := executor.NewSFNWorkflowManager(cachedSFNAPI, sqsapi, db, c.SFNRoleARN, c.SFNRegion, c.SFNAccountID, c.SQSQueueURL) es, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{c.ESURL}}) if err != nil { log.Fatal(err) } + es = tracedClient(es) h := Handler{ store: db, @@ -185,3 +196,32 @@ func logSFNCounts(sfnCounter *counter.Counter) { executor.LogSFNCounts(sfnCounter.Counters()) } } + +// Below provides a thin wrapper around the elasticsearch Client with opentracing added in +// The Client consists of a Transport object which handles the http requests, and an API object +// which makes calls to the Transport. We take the old transport, wrap it with the tracing, +// then build a new API object from it. + +type tracedESTransport struct { + child estransport.Interface +} + +func (t tracedESTransport) Perform(req *http.Request) (*http.Response, error) { + span, ctx := opentracing.StartSpanFromContext(req.Context(), "elasticsearch request") + // TODO: add some fields from the req + req = req.WithContext(ctx) + resp, err := t.child.Perform(req) + span.Finish() + return resp, err +} + +func tracedClient(client *elasticsearch.Client) *elasticsearch.Client { + + tracedTransport := tracedESTransport{ + child: client.Transport, + } + return &elasticsearch.Client{ + API: esapi.New(tracedTransport), + Transport: tracedTransport, + } +} From db70a000a1e8b3e92fd64aa6a601bd5198cb141a Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Mon, 2 Sep 2019 13:37:50 -0700 Subject: [PATCH 2/5] Additional tracing information. * Tags for elasticsearch requests * Tracing for the workflow update loop * Add context to the AWS api calls to track child spans better --- executor/workflow_manager.go | 14 +- executor/workflow_manager_sfn.go | 51 +++-- gen-js/index.d.ts | 382 +++++++++++++++++++++++++++++++ gen-js/package.json | 3 + main.go | 21 +- 5 files changed, 441 insertions(+), 30 deletions(-) create mode 100644 gen-js/index.d.ts diff --git a/executor/workflow_manager.go b/executor/workflow_manager.go index 55fc95d7..660c4206 100644 --- a/executor/workflow_manager.go +++ b/executor/workflow_manager.go @@ -15,6 +15,8 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" ) // WorkflowManager is the interface for creating, stopping and checking status for Workflows @@ -37,17 +39,21 @@ func PollForPendingWorkflowsAndUpdateStore(ctx context.Context, wm WorkflowManag log.Info("poll-for-pending-workflows-done") return default: - out, err := sqsapi.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ + span, innerCtx := opentracing.StartSpanFromContext(ctx, "updating-pending-workflows") + + out, err := sqsapi.ReceiveMessageWithContext(innerCtx, &sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(10), QueueUrl: aws.String(sqsQueueURL), WaitTimeSeconds: aws.Int64(10), }) if err != nil { log.ErrorD("poll-for-pending-workflows", logger.M{"error": err.Error()}) + ext.Error.Set(span, true) + span.SetTag("errorMessage", err.Error()) } for _, message := range out.Messages { - if id, err := updatePendingWorkflow(ctx, message, wm, thestore, sqsapi, sqsQueueURL); err != nil { + if id, err := updatePendingWorkflow(innerCtx, message, wm, thestore, sqsapi, sqsQueueURL); err != nil { log.ErrorD("update-pending-workflow", logger.M{"id": id, "error": err.Error()}) // If we're seeing DynamoDB throttling, let's wait before running our next poll loop @@ -62,6 +68,7 @@ func PollForPendingWorkflowsAndUpdateStore(ctx context.Context, wm WorkflowManag log.InfoD("update-pending-workflow", logger.M{"id": id}) } } + span.Finish() } } } @@ -88,6 +95,8 @@ func createPendingWorkflow(ctx context.Context, workflowID string, sqsapi sqsifa } func updatePendingWorkflow(ctx context.Context, m *sqs.Message, wm WorkflowManager, thestore store.Store, sqsapi sqsiface.SQSAPI, sqsQueueURL string) (string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "workflow-update") + defer span.Finish() deleteMsg := func() { if _, err := sqsapi.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{ QueueUrl: aws.String(sqsQueueURL), @@ -107,6 +116,7 @@ func updatePendingWorkflow(ctx context.Context, m *sqs.Message, wm WorkflowManag } wfID := *m.Body + span.SetTag("wf-id", wfID) wf, err := thestore.GetWorkflowByID(ctx, wfID) if err != nil { if _, ok := err.(models.NotFound); ok { diff --git a/executor/workflow_manager_sfn.go b/executor/workflow_manager_sfn.go index 486fef3e..b1b78f28 100644 --- a/executor/workflow_manager_sfn.go +++ b/executor/workflow_manager_sfn.go @@ -125,10 +125,11 @@ func toSFNTags(wmTags map[string]interface{}) []*sfn.Tag { return sfnTags } -func (wm *SFNWorkflowManager) describeOrCreateStateMachine(wd models.WorkflowDefinition, namespace, queue string) (*sfn.DescribeStateMachineOutput, error) { - describeOutput, err := wm.sfnapi.DescribeStateMachine(&sfn.DescribeStateMachineInput{ - StateMachineArn: aws.String(sfnconventions.StateMachineArn(wm.region, wm.accountID, wd.Name, wd.Version, namespace, wd.StateMachine.StartAt)), - }) +func (wm *SFNWorkflowManager) describeOrCreateStateMachine(ctx context.Context, wd models.WorkflowDefinition, namespace, queue string) (*sfn.DescribeStateMachineOutput, error) { + describeOutput, err := wm.sfnapi.DescribeStateMachineWithContext(ctx, + &sfn.DescribeStateMachineInput{ + StateMachineArn: aws.String(sfnconventions.StateMachineArn(wm.region, wm.accountID, wd.Name, wd.Version, namespace, wd.StateMachine.StartAt)), + }) if err == nil { return describeOutput, nil } @@ -152,25 +153,26 @@ func (wm *SFNWorkflowManager) describeOrCreateStateMachine(wd models.WorkflowDef // this effectively creates a new workflow definition in each namespace we deploy into awsStateMachineName := sfnconventions.StateMachineName(wd.Name, wd.Version, namespace, wd.StateMachine.StartAt) log.InfoD("create-state-machine", logger.M{"definition": awsStateMachineDef, "name": awsStateMachineName}) - _, err = wm.sfnapi.CreateStateMachine(&sfn.CreateStateMachineInput{ - Name: aws.String(awsStateMachineName), - Definition: aws.String(awsStateMachineDef), - RoleArn: aws.String(wm.roleARN), - Tags: append([]*sfn.Tag{ - {Key: aws.String("environment"), Value: aws.String(namespace)}, - {Key: aws.String("workflow-definition-name"), Value: aws.String(wd.Name)}, - {Key: aws.String("workflow-definition-version"), Value: aws.String(fmt.Sprintf("%d", wd.Version))}, - {Key: aws.String("workflow-definition-start-at"), Value: aws.String(wd.StateMachine.StartAt)}, - }, toSFNTags(wd.DefaultTags)...), - }) + _, err = wm.sfnapi.CreateStateMachineWithContext(ctx, + &sfn.CreateStateMachineInput{ + Name: aws.String(awsStateMachineName), + Definition: aws.String(awsStateMachineDef), + RoleArn: aws.String(wm.roleARN), + Tags: append([]*sfn.Tag{ + {Key: aws.String("environment"), Value: aws.String(namespace)}, + {Key: aws.String("workflow-definition-name"), Value: aws.String(wd.Name)}, + {Key: aws.String("workflow-definition-version"), Value: aws.String(fmt.Sprintf("%d", wd.Version))}, + {Key: aws.String("workflow-definition-start-at"), Value: aws.String(wd.StateMachine.StartAt)}, + }, toSFNTags(wd.DefaultTags)...), + }) if err != nil { return nil, fmt.Errorf("CreateStateMachine error: %s", err.Error()) } - return wm.describeOrCreateStateMachine(wd, namespace, queue) + return wm.describeOrCreateStateMachine(ctx, wd, namespace, queue) } -func (wm *SFNWorkflowManager) startExecution(stateMachineArn *string, workflowID, input string) error { +func (wm *SFNWorkflowManager) startExecution(ctx context.Context, stateMachineArn *string, workflowID, input string) error { executionName := aws.String(workflowID) var inputJSON map[string]interface{} @@ -192,7 +194,7 @@ func (wm *SFNWorkflowManager) startExecution(stateMachineArn *string, workflowID // - aws.String(""): leads to InvalidExecutionInput AWS error // - aws.String("[]"): leads to an input of an empty array "[]" startExecutionInput := aws.String(string(marshaledInput)) - _, err = wm.sfnapi.StartExecution(&sfn.StartExecutionInput{ + _, err = wm.sfnapi.StartExecutionWithContext(ctx, &sfn.StartExecutionInput{ StateMachineArn: stateMachineArn, Input: startExecutionInput, Name: executionName, @@ -207,7 +209,7 @@ func (wm *SFNWorkflowManager) CreateWorkflow(ctx context.Context, wd models.Work queue string, tags map[string]interface{}) (*models.Workflow, error) { - describeOutput, err := wm.describeOrCreateStateMachine(wd, namespace, queue) + describeOutput, err := wm.describeOrCreateStateMachine(ctx, wd, namespace, queue) if err != nil { return nil, err } @@ -231,7 +233,7 @@ func (wm *SFNWorkflowManager) CreateWorkflow(ctx context.Context, wd models.Work } // submit an execution using input, set execution name == our workflow GUID - err = wm.startExecution(describeOutput.StateMachineArn, workflow.ID, input) + err = wm.startExecution(ctx, describeOutput.StateMachineArn, workflow.ID, input) if err != nil { // since we failed to start execution, remove Workflow from store if delErr := wm.store.DeleteWorkflowByID(ctx, workflow.ID); delErr != nil { @@ -267,7 +269,7 @@ func (wm *SFNWorkflowManager) RetryWorkflow(ctx context.Context, ogWorkflow mode if err := resources.RemoveInactiveStates(newDef.StateMachine); err != nil { return nil, err } - describeOutput, err := wm.describeOrCreateStateMachine(newDef, ogWorkflow.Namespace, ogWorkflow.Queue) + describeOutput, err := wm.describeOrCreateStateMachine(ctx, newDef, ogWorkflow.Namespace, ogWorkflow.Queue) if err != nil { return nil, err } @@ -287,7 +289,7 @@ func (wm *SFNWorkflowManager) RetryWorkflow(ctx context.Context, ogWorkflow mode } // submit an execution using input, set execution name == our workflow GUID - err = wm.startExecution(describeOutput.StateMachineArn, workflow.ID, input) + err = wm.startExecution(ctx, describeOutput.StateMachineArn, workflow.ID, input) if err != nil { return nil, err } @@ -308,7 +310,7 @@ func (wm *SFNWorkflowManager) CancelWorkflow(ctx context.Context, workflow *mode wd := workflow.WorkflowDefinition execARN := wm.executionArn(workflow, wd) - if _, err := wm.sfnapi.StopExecution(&sfn.StopExecutionInput{ + if _, err := wm.sfnapi.StopExecutionWithContext(ctx, &sfn.StopExecutionInput{ ExecutionArn: aws.String(execARN), Cause: aws.String(reason), // Error: aws.String(""), // TODO: Can we use this? "An arbitrary error code that identifies the cause of the termination." @@ -415,8 +417,7 @@ func (wm *SFNWorkflowManager) UpdateWorkflowHistory(ctx context.Context, workflo // Setup a context with a timeout of one minute since // we don't want to pull very large workflow histories - // TODO: this should be a context passed by the handler - ctx, cancel := context.WithTimeout(context.Background(), durationToFetchHistoryPages) + ctx, cancel := context.WithTimeout(ctx, durationToFetchHistoryPages) defer cancel() var jobs []*models.Job diff --git a/gen-js/index.d.ts b/gen-js/index.d.ts new file mode 100644 index 00000000..a5defbd1 --- /dev/null +++ b/gen-js/index.d.ts @@ -0,0 +1,382 @@ +import { Span, Tracer } from "opentracing"; +import { Logger } from "kayvee"; + +type Callback = (err: Error, result: R) => void; +type ArrayInner = R extends (infer T)[] ? T : never; + +interface RetryPolicy { + backoffs(): number[]; + retry(requestOptions: {method: string}, err: Error, res: {statusCode: number}): boolean; +} + +interface RequestOptions { + timeout?: number; + span?: Span; + retryPolicy?: RetryPolicy; +} + +interface IterResult { + map(f: (r: R) => T, cb?: Callback): Promise; + toArray(cb?: Callback): Promise; + forEach(f: (r: R) => void, cb?: Callback): Promise; +} + +interface CircuitOptions { + forceClosed?: boolean; + maxConcurrentRequests?: number; + requestVolumeThreshold?: number; + sleepWindow?: number; + errorPercentThreshold?: number; +} + +interface GenericOptions { + timeout?: number; + keepalive?: boolean; + retryPolicy?: RetryPolicy; + logger?: Logger; + tracer?: Tracer; + circuit?: CircuitOptions; +} + +interface DiscoveryOptions { + discovery: true; + address?: undefined; +} + +interface AddressOptions { + discovery?: false; + address: string; +} + +type WorkflowManagerOptions = (DiscoveryOptions | AddressOptions) & GenericOptions; + + +type CancelReason = { + reason?: string; +}; + +type CancelWorkflowParams = { + workflowID: string; + reason: CancelReason; +}; + +type Conflict = { + message?: string; +}; + +type DeleteStateResourceParams = { + namespace: string; + name: string; +}; + +type GetStateResourceParams = { + namespace: string; + name: string; +}; + +type GetWorkflowDefinitionByNameAndVersionParams = { + name: string; + version: number; +}; + +type GetWorkflowDefinitionVersionsByNameParams = { + name: string; + latest?: boolean; +}; + +type GetWorkflowsParams = { + limit?: number; + oldestFirst?: boolean; + pageToken?: string; + status?: string; + resolvedByUser?: boolean; + summaryOnly?: boolean; + workflowDefinitionName: string; +}; + +type Job = { + attempts?: JobAttempt[]; + container?: string; + createdAt?: string; + id?: string; + input?: string; + name?: string; + output?: string; + queue?: string; + startedAt?: string; + state?: string; + stateResource?: StateResource; + status?: JobStatus; + statusReason?: string; + stoppedAt?: string; +}; + +type JobAttempt = { + containerInstanceARN?: string; + createdAt?: string; + exitCode?: number; + reason?: string; + startedAt?: string; + stoppedAt?: string; + taskARN?: string; +}; + +type JobStatus = ("created" | "queued" | "waiting_for_deps" | "running" | "succeeded" | "failed" | "aborted_deps_failed" | "aborted_by_user"); + +type Manager = ("step-functions"); + +type NewStateResource = { + name?: string; + namespace?: string; + uri?: string; +}; + +type NewWorkflowDefinitionRequest = { + defaultTags?: { [key: string]: { + +} }; + manager?: Manager; + name?: string; + stateMachine?: SLStateMachine; + version?: number; +}; + +type PutStateResourceParams = { + namespace: string; + name: string; + NewStateResource?: NewStateResource; +}; + +type ResolvedByUserWrapper = { + isSet?: boolean; + value?: boolean; +}; + +type ResumeWorkflowByIDParams = { + workflowID: string; + overrides: WorkflowDefinitionOverrides; +}; + +type SLCatcher = { + ErrorEquals?: SLErrorEquals[]; + Next?: string; + ResultPath?: string; +}; + +type SLChoice = { + And?: SLChoice[]; + BooleanEquals?: boolean; + Next?: string; + Not?: SLChoice; + NumericEquals?: number; + NumericGreaterThan?: number; + NumericGreaterThanEquals?: number; + NumericLessThan?: number; + NumericLessThanEquals?: number; + Or?: SLChoice[]; + StringEquals?: string; + StringGreaterThan?: string; + StringGreaterThanEquals?: string; + StringLessThan?: string; + StringLessThanEquals?: string; + TimestampEquals?: string; + TimestampGreaterThan?: string; + TimestampGreaterThanEquals?: string; + TimestampLessThan?: string; + TimestampLessThanEquals?: string; + Variable?: string; +}; + +type SLErrorEquals = string; + +type SLRetrier = { + BackoffRate?: number; + ErrorEquals?: SLErrorEquals[]; + IntervalSeconds?: number; + MaxAttempts?: number; +}; + +type SLState = { + Branches?: SLStateMachine[]; + Catch?: SLCatcher[]; + Cause?: string; + Choices?: SLChoice[]; + Comment?: string; + Default?: string; + End?: boolean; + Error?: string; + HeartbeatSeconds?: number; + InputPath?: string; + Next?: string; + OutputPath?: string; + Resource?: string; + Result?: string; + ResultPath?: string; + Retry?: SLRetrier[]; + Seconds?: number; + SecondsPath?: string; + TimeoutSeconds?: number; + Timestamp?: string; + TimestampPath?: string; + Type?: SLStateType; +}; + +type SLStateMachine = { + Comment?: string; + StartAt?: string; + States?: { [key: string]: SLState }; + TimeoutSeconds?: number; + Version?: ("1.0"); +}; + +type SLStateType = ("Pass" | "Task" | "Choice" | "Wait" | "Succeed" | "Fail" | "Parallel"); + +type StartWorkflowRequest = { + input?: string; + namespace?: string; + queue?: string; + tags?: { [key: string]: { + +} }; + workflowDefinition?: WorkflowDefinitionRef; +}; + +type StateResource = { + lastUpdated?: string; + name?: string; + namespace?: string; + type?: StateResourceType; + uri?: string; +}; + +type StateResourceType = ("JobDefinitionARN" | "ActivityARN" | "LambdaFunctionARN"); + +type UpdateWorkflowDefinitionParams = { + NewWorkflowDefinitionRequest?: NewWorkflowDefinitionRequest; + name: string; +}; + +type Workflow = any; + +type WorkflowDefinition = { + createdAt?: string; + defaultTags?: { [key: string]: { + +} }; + id?: string; + manager?: Manager; + name?: string; + stateMachine?: SLStateMachine; + version?: number; +}; + +type WorkflowDefinitionOverrides = { + StartAt?: string; +}; + +type WorkflowDefinitionRef = { + name?: string; + version?: number; +}; + +type WorkflowQuery = { + limit?: number; + oldestFirst?: boolean; + pageToken?: string; + resolvedByUserWrapper?: ResolvedByUserWrapper; + status?: WorkflowStatus; + summaryOnly?: boolean; + workflowDefinitionName: string; +}; + +type WorkflowStatus = ("queued" | "running" | "failed" | "succeeded" | "cancelled"); + +type WorkflowSummary = { + createdAt?: string; + id?: string; + input?: string; + lastJob?: Job; + lastUpdated?: string; + namespace?: string; + queue?: string; + resolvedByUser?: boolean; + retries?: string[]; + retryFor?: string; + status?: WorkflowStatus; + stoppedAt?: string; + tags?: { [key: string]: { + +} }; + workflowDefinition?: WorkflowDefinition; +}; + +declare class WorkflowManager { + constructor(options: WorkflowManagerOptions); + + + healthCheck(options?: RequestOptions, cb?: Callback): Promise + + postStateResource(NewStateResource?: NewStateResource, options?: RequestOptions, cb?: Callback): Promise + + deleteStateResource(params: DeleteStateResourceParams, options?: RequestOptions, cb?: Callback): Promise + + getStateResource(params: GetStateResourceParams, options?: RequestOptions, cb?: Callback): Promise + + putStateResource(params: PutStateResourceParams, options?: RequestOptions, cb?: Callback): Promise + + getWorkflowDefinitions(options?: RequestOptions, cb?: Callback): Promise + + newWorkflowDefinition(NewWorkflowDefinitionRequest?: NewWorkflowDefinitionRequest, options?: RequestOptions, cb?: Callback): Promise + + getWorkflowDefinitionVersionsByName(params: GetWorkflowDefinitionVersionsByNameParams, options?: RequestOptions, cb?: Callback): Promise + + updateWorkflowDefinition(params: UpdateWorkflowDefinitionParams, options?: RequestOptions, cb?: Callback): Promise + + getWorkflowDefinitionByNameAndVersion(params: GetWorkflowDefinitionByNameAndVersionParams, options?: RequestOptions, cb?: Callback): Promise + + getWorkflows(params: GetWorkflowsParams, options?: RequestOptions, cb?: Callback): Promise + getWorkflowsIter(params: GetWorkflowsParams, options?: RequestOptions): IterResult> + + startWorkflow(StartWorkflowRequest?: StartWorkflowRequest, options?: RequestOptions, cb?: Callback): Promise + + CancelWorkflow(params: CancelWorkflowParams, options?: RequestOptions, cb?: Callback): Promise + + getWorkflowByID(workflowID: string, options?: RequestOptions, cb?: Callback): Promise + + resumeWorkflowByID(params: ResumeWorkflowByIDParams, options?: RequestOptions, cb?: Callback): Promise + + resolveWorkflowByID(workflowID: string, options?: RequestOptions, cb?: Callback): Promise + +} + +declare namespace WorkflowManager { + const RetryPolicies: { + Single: RetryPolicy; + Exponential: RetryPolicy; + None: RetryPolicy; + } + + const DefaultCircuitOptions: CircuitOptions; + + namespace Errors { + + class BadRequest { + message?: string; +} + + class InternalError { + message?: string; +} + + class NotFound { + message?: string; +} + + class Conflict { + message?: string; +} + + } +} + +export = WorkflowManager; diff --git a/gen-js/package.json b/gen-js/package.json index 5730a9be..8870bbcb 100644 --- a/gen-js/package.json +++ b/gen-js/package.json @@ -11,5 +11,8 @@ "kayvee": "^3.8.2", "hystrixjs": "^0.2.0", "rxjs": "^5.4.1" + }, + "devDependencies": { + "typescript": "^3.3.0" } } diff --git a/main.go b/main.go index 85de1f1f..90087451 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "github.com/kardianos/osext" otaws "github.com/opentracing-contrib/go-aws-sdk" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" counter "github.com/Clever/aws-sdk-go-counter" "github.com/Clever/workflow-manager/executor" @@ -207,11 +208,25 @@ type tracedESTransport struct { } func (t tracedESTransport) Perform(req *http.Request) (*http.Response, error) { - span, ctx := opentracing.StartSpanFromContext(req.Context(), "elasticsearch request") - // TODO: add some fields from the req + span, ctx := opentracing.StartSpanFromContext(req.Context(), "elasticsearch-request") + defer span.Finish() + + // These fields mirror the ones in the aws-sdk-go opentracing package + ext.SpanKindRPCClient.Set(span) + ext.Component.Set(span, "go-elasticsearch") + ext.HTTPMethod.Set(span, req.Method) + ext.HTTPUrl.Set(span, req.URL.String()) + ext.PeerService.Set(span, "elasticsearch") + req = req.WithContext(ctx) resp, err := t.child.Perform(req) - span.Finish() + + if err != nil { + ext.Error.Set(span, true) + } else { + ext.HTTPStatusCode.Set(span, uint16(resp.StatusCode)) + } + return resp, err } From 0c94c5af86dd997d4bca4576f11404e32b51f47f Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Mon, 2 Sep 2019 14:16:43 -0700 Subject: [PATCH 3/5] Update tests -- we now call aws api with context --- executor/workflow_manager_sfn_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/executor/workflow_manager_sfn_test.go b/executor/workflow_manager_sfn_test.go index 719d27b4..eb9c24b2 100644 --- a/executor/workflow_manager_sfn_test.go +++ b/executor/workflow_manager_sfn_test.go @@ -153,14 +153,14 @@ func TestCreateWorkflow(t *testing.T) { c.workflowDefinition.StateMachine.StartAt, ) c.mockSFNAPI.EXPECT(). - DescribeStateMachine(&sfn.DescribeStateMachineInput{ + DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{ StateMachineArn: aws.String(stateMachineArn), }). Return(&sfn.DescribeStateMachineOutput{ StateMachineArn: aws.String(stateMachineArn), }, nil) c.mockSFNAPI.EXPECT(). - StartExecution(gomock.Any()). + StartExecutionWithContext(gomock.Any(), gomock.Any()). Return(&sfn.StartExecutionOutput{}, nil) c.mockSQSAPI.EXPECT(). SendMessageWithContext(gomock.Any(), gomock.Any()). @@ -240,14 +240,14 @@ func TestCreateWorkflow(t *testing.T) { c.workflowDefinition.StateMachine.StartAt, ) c.mockSFNAPI.EXPECT(). - DescribeStateMachine(&sfn.DescribeStateMachineInput{ + DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{ StateMachineArn: aws.String(stateMachineArn), }). Return(&sfn.DescribeStateMachineOutput{ StateMachineArn: aws.String(stateMachineArn), }, nil) c.mockSFNAPI.EXPECT(). - StartExecution(gomock.Any()). + StartExecutionWithContext(gomock.Any(), gomock.Any()). Return(&sfn.StartExecutionOutput{}, nil) c.mockSQSAPI.EXPECT(). SendMessageWithContext(gomock.Any(), gomock.Any()). @@ -291,14 +291,14 @@ func TestCreateWorkflow(t *testing.T) { ) awsError := awserr.New("test", "test", errors.New("")) c.mockSFNAPI.EXPECT(). - DescribeStateMachine(&sfn.DescribeStateMachineInput{ + DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{ StateMachineArn: aws.String(stateMachineArn), }). Return(&sfn.DescribeStateMachineOutput{ StateMachineArn: aws.String(stateMachineArn), }, nil) c.mockSFNAPI.EXPECT(). - StartExecution(gomock.Any()). + StartExecutionWithContext(gomock.Any(), gomock.Any()). Return(nil, awsError) workflow, err := c.manager.CreateWorkflow(ctx, *c.workflowDefinition, @@ -331,14 +331,14 @@ func TestRetryWorkflow(t *testing.T) { c.workflowDefinition.StateMachine.StartAt, ) c.mockSFNAPI.EXPECT(). - DescribeStateMachine(&sfn.DescribeStateMachineInput{ + DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{ StateMachineArn: aws.String(stateMachineArn), }). Return(&sfn.DescribeStateMachineOutput{ StateMachineArn: aws.String(stateMachineArn), }, nil) c.mockSFNAPI.EXPECT(). - StartExecution(gomock.Any()). + StartExecutionWithContext(gomock.Any(), gomock.Any()). Return(&sfn.StartExecutionOutput{}, nil) c.mockSQSAPI.EXPECT(). SendMessageWithContext(gomock.Any(), gomock.Any()). @@ -370,12 +370,12 @@ func TestRetryWorkflow(t *testing.T) { workflow.Status = models.WorkflowStatusFailed c.mockSFNAPI.EXPECT(). - DescribeStateMachine(gomock.Any()). + DescribeStateMachineWithContext(gomock.Any(), gomock.Any()). Return(&sfn.DescribeStateMachineOutput{ StateMachineArn: aws.String(stateMachineArn), }, nil) c.mockSFNAPI.EXPECT(). - StartExecution(gomock.Any()). + StartExecutionWithContext(gomock.Any(), gomock.Any()). Return(&sfn.StartExecutionOutput{}, nil) c.mockSQSAPI.EXPECT(). SendMessageWithContext(gomock.Any(), gomock.Any()). @@ -413,7 +413,7 @@ func TestCancelWorkflow(t *testing.T) { reason := "i have my reasons" sfnExecutionARN := c.manager.executionArn(workflow, c.workflowDefinition) c.mockSFNAPI.EXPECT(). - StopExecution(&sfn.StopExecutionInput{ + StopExecutionWithContext(gomock.Any(), &sfn.StopExecutionInput{ ExecutionArn: aws.String(sfnExecutionARN), Cause: aws.String(reason), }). From a38f122edf923728c8fc6d1215ee265c2303ea98 Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Mon, 2 Sep 2019 20:50:43 -0700 Subject: [PATCH 4/5] Normalize wf-id -> workflow-id --- executor/workflow_manager.go | 2 +- executor/workflow_manager_sfn.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/workflow_manager.go b/executor/workflow_manager.go index 660c4206..1d2e9bae 100644 --- a/executor/workflow_manager.go +++ b/executor/workflow_manager.go @@ -116,7 +116,7 @@ func updatePendingWorkflow(ctx context.Context, m *sqs.Message, wm WorkflowManag } wfID := *m.Body - span.SetTag("wf-id", wfID) + span.SetTag("workflow-id", wfID) wf, err := thestore.GetWorkflowByID(ctx, wfID) if err != nil { if _, ok := err.(models.NotFound); ok { diff --git a/executor/workflow_manager_sfn.go b/executor/workflow_manager_sfn.go index b1b78f28..e887f7b5 100644 --- a/executor/workflow_manager_sfn.go +++ b/executor/workflow_manager_sfn.go @@ -227,7 +227,7 @@ func (wm *SFNWorkflowManager) CreateWorkflow(ctx context.Context, wd models.Work // i.e. execution was started but we failed to save workflow // If we fail starting the execution, we can resolve this out of band (TODO: should support cancelling) workflow := resources.NewWorkflow(&wd, input, namespace, queue, mergedTags) - logger.FromContext(ctx).AddContext("wf-id", workflow.ID) + logger.FromContext(ctx).AddContext("workflow-id", workflow.ID) if err := wm.store.SaveWorkflow(ctx, *workflow); err != nil { return nil, err } From 44ae6864619f65eba34060bc2ed66ca0df1455ae Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Tue, 3 Sep 2019 10:49:35 -0700 Subject: [PATCH 5/5] Bump minor version --- docs/overview.md | 2 +- gen-js/package.json | 2 +- swagger.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/overview.md b/docs/overview.md index e846f50c..192f8645 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions ### Version information -*Version* : 0.10.1 +*Version* : 0.11.0 ### URI scheme diff --git a/gen-js/package.json b/gen-js/package.json index 8870bbcb..b8253b15 100644 --- a/gen-js/package.json +++ b/gen-js/package.json @@ -1,6 +1,6 @@ { "name": "workflow-manager", - "version": "0.10.1", + "version": "0.11.0", "description": "Orchestrator for AWS Step Functions", "main": "index.js", "dependencies": { diff --git a/swagger.yml b/swagger.yml index e49ca68f..903743fe 100644 --- a/swagger.yml +++ b/swagger.yml @@ -4,7 +4,7 @@ info: description: Orchestrator for AWS Step Functions # when changing the version here, make sure to # re-run `make generate` to generate clients and server - version: 0.10.1 + version: 0.11.0 x-npm-package: workflow-manager schemes: - http