diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index c34083c819..8c48ec702c 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -86,11 +86,14 @@ services: image: temporalio/admin-tools:1.22 stdin_open: true tty: true + entrypoint: ["bash", "/etc/temporal/entrypoint.sh"] healthcheck: test: ["CMD", "tctl", "workflow", "list"] interval: 1s timeout: 5s retries: 30 + volumes: + - ./mirror-name-search.sh:/etc/temporal/entrypoint.sh temporal-ui: container_name: temporal-ui diff --git a/docker-compose.yml b/docker-compose.yml index 4b445e29c3..523dcb1389 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,11 +73,14 @@ services: image: temporalio/admin-tools:1.22 stdin_open: true tty: true + entrypoint: ["bash", "/etc/temporal/entrypoint.sh"] healthcheck: test: ["CMD", "tctl", "workflow", "list"] interval: 1s timeout: 5s retries: 30 + volumes: + - ./mirror-name-search.sh:/etc/temporal/entrypoint.sh temporal-ui: container_name: temporal-ui diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 8cd4951787..aafad6b255 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -129,6 +129,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + "MirrorName": cfg.FlowJobName, + }, } maxBatchSize := int(cfg.MaxBatchSize) @@ -229,6 +232,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + "MirrorName": cfg.FlowJobName, + }, } if req.CreateCatalogEntry { err := h.createQrepJobEntry(ctx, req, workflowID) @@ -311,6 +317,9 @@ func (h *FlowRequestHandler) ShutdownFlow( workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + "MirrorName": req.FlowJobName, + }, } dropFlowHandle, err := h.temporalClient.ExecuteWorkflow( ctx, // context diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index bbf134bf09..9d0aae108a 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -189,6 +189,10 @@ func CDCFlowWorkflowWithConfig( } } + mirrorNameSearch := map[string]interface{}{ + "MirrorName": cfg.FlowJobName, + } + // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName) @@ -201,6 +205,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: mirrorNameSearch, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -226,7 +231,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, + TaskQueue: taskQueue, + SearchAttributes: mirrorNameSearch, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) @@ -323,6 +329,9 @@ func CDCFlowWorkflowWithConfig( return state, err } + mirrorNameSearch := map[string]interface{}{ + "MirrorName": cfg.FlowJobName, + } // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, @@ -330,6 +339,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: mirrorNameSearch, } ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping @@ -362,6 +372,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: mirrorNameSearch, } ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 3b8e77a686..c8a7c1023f 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -206,6 +206,9 @@ func (q *QRepFlowExecution) startChildWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: map[string]interface{}{ + "MirrorName": q.config.FlowJobName, + }, }) future := workflow.ExecuteChildWorkflow( diff --git a/mirror-name-search.sh b/mirror-name-search.sh new file mode 100644 index 0000000000..c4f738a67a --- /dev/null +++ b/mirror-name-search.sh @@ -0,0 +1,7 @@ +# Check if MirrorName attribute exists +if ! temporal operator search-attribute list | grep -w MirrorName >/dev/null 2>&1; then + # If not, create MirrorName attribute + temporal operator search-attribute create --name MirrorName --type Text +fi + +tini -- sleep infinity