Skip to content

Commit

Permalink
remove search attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 24, 2023
1 parent 0fd28a0 commit 89bbca2
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 19 deletions.
3 changes: 0 additions & 3 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,6 @@ func (h *FlowRequestHandler) ShutdownFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
SearchAttributes: map[string]interface{}{
"MirrorName": req.FlowJobName,
},
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
14 changes: 1 addition & 13 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,6 @@ func CDCFlowWorkflowWithConfig(
}
}

searchAttributes := map[string]interface{}{
"MirrorName": cfg.FlowJobName,
}

// start the SetupFlow workflow as a child workflow, and wait for it to complete
// it should return the table schema for the source peer
setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName)
Expand All @@ -205,7 +201,6 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: searchAttributes,
}
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
Expand All @@ -225,8 +220,7 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
TaskQueue: shared.SnapshotFlowTaskQueue,
SearchAttributes: searchAttributes,
TaskQueue: shared.SnapshotFlowTaskQueue,
}
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
Expand Down Expand Up @@ -323,18 +317,13 @@ func CDCFlowWorkflowWithConfig(
return state, err
}

searchAttributes := map[string]interface{}{
"MirrorName": cfg.FlowJobName,
}

// execute the sync flow as a child workflow
childSyncFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: syncFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: searchAttributes,
}
ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping
Expand Down Expand Up @@ -367,7 +356,6 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: searchAttributes,
}
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)

Expand Down
3 changes: 0 additions & 3 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ func (q *QRepFlowExecution) startChildWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: map[string]interface{}{
"MirrorName": q.config.FlowJobName,
},
})

future := workflow.ExecuteChildWorkflow(
Expand Down

0 comments on commit 89bbca2

Please sign in to comment.