Skip to content

Commit

Permalink
Support for mirror name filter (#731)
Browse files Browse the repository at this point in the history
Fixes #696 
We can now filter workflows by mirror name in Temporal UI

<img width="1717" alt="Screenshot 2023-11-28 at 11 42 12 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/a625200d-acf6-4371-84cb-a685e6490b33">
  • Loading branch information
Amogh-Bharadwaj authored Nov 28, 2023
1 parent eaf864c commit a4c028c
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 1 deletion.
3 changes: 3 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,14 @@ services:
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
entrypoint: ["bash", "/etc/temporal/entrypoint.sh"]
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui:
container_name: temporal-ui
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@ services:
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
entrypoint: ["bash", "/etc/temporal/entrypoint.sh"]
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui:
container_name: temporal-ui
Expand Down
9 changes: 9 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
}

maxBatchSize := int(cfg.MaxBatchSize)
Expand Down Expand Up @@ -229,6 +232,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
}
if req.CreateCatalogEntry {
err := h.createQrepJobEntry(ctx, req, workflowID)
Expand Down Expand Up @@ -311,6 +317,9 @@ func (h *FlowRequestHandler) ShutdownFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: req.FlowJobName,
},
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
2 changes: 2 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const (
CDCFlowSignalName = "peer-flow-signal"
)

const MirrorNameSearchAttribute = "MirrorName"

type CDCFlowSignal int64
type ContextKey string

Expand Down
13 changes: 12 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func CDCFlowWorkflowWithConfig(
}
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

// start the SetupFlow workflow as a child workflow, and wait for it to complete
// it should return the table schema for the source peer
setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName)
Expand All @@ -201,6 +205,7 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
Expand All @@ -226,7 +231,8 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
TaskQueue: taskQueue,
TaskQueue: taskQueue,
SearchAttributes: mirrorNameSearch,
}
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
Expand Down Expand Up @@ -323,13 +329,17 @@ func CDCFlowWorkflowWithConfig(
return state, err
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}
// execute the sync flow as a child workflow
childSyncFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: syncFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping
Expand Down Expand Up @@ -362,6 +372,7 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)

Expand Down
3 changes: 3 additions & 0 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func (q *QRepFlowExecution) startChildWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: q.config.FlowJobName,
},
})

future := workflow.ExecuteChildWorkflow(
Expand Down
7 changes: 7 additions & 0 deletions scripts/mirror-name-search.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Check if MirrorName attribute exists
if ! temporal operator search-attribute list | grep -w MirrorName >/dev/null 2>&1; then
# If not, create MirrorName attribute
temporal operator search-attribute create --name MirrorName --type Text
fi

tini -- sleep infinity

0 comments on commit a4c028c

Please sign in to comment.