From b2b2acf80404db33ea3050bf765c130f9703f9e0 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 28 Nov 2023 21:18:40 +0530 Subject: [PATCH] Get task queues from a function based on deployment uid variable (#729) --- docker-compose-dev.yml | 1 - flow/cmd/api.go | 21 +++++++++++++++------ flow/cmd/handler.go | 18 ++++++++++-------- flow/cmd/snapshot_worker.go | 7 ++++++- flow/cmd/worker.go | 7 ++++++- flow/shared/constants.go | 28 ++++++++++++++++++++++++++-- flow/workflows/cdc_flow.go | 8 +++++++- flow/workflows/snapshot_flow.go | 7 ++++++- 8 files changed, 76 insertions(+), 21 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 2963b3ca57..af7fed1fd6 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -14,7 +14,6 @@ x-flow-worker-env: &flow-worker-env PEERDB_TEMPORAL_NAMESPACE: default # For the below 2 cert and key variables, # paste as base64 encoded strings. - # use yml multiline syntax with '|' TEMPORAL_CLIENT_CERT: TEMPORAL_CLIENT_KEY: # For GCS, these will be your HMAC keys instead diff --git a/flow/cmd/api.go b/flow/cmd/api.go index ff036fb6fb..6715ff3c55 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -62,11 +62,15 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) { return server, nil } -func killExistingHeartbeatFlows(ctx context.Context, tc client.Client, namespace string) error { +func killExistingHeartbeatFlows( + ctx context.Context, + tc client.Client, + namespace string, + taskQueue string) error { listRes, err := tc.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ Namespace: namespace, - Query: "WorkflowType = 'HeartbeatFlowWorkflow'", + Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'", }) if err != nil { return fmt.Errorf("unable to list workflows: %w", err) @@ -94,7 +98,7 @@ func APIMain(args *APIServerParams) error { certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey) if err != nil { - return fmt.Errorf("unable to process certificate and key: %w", err) + return fmt.Errorf("unable to base64 decode certificate and key: %w", err) } connOptions := client.ConnectionOptions{ @@ -115,10 +119,15 @@ func APIMain(args *APIServerParams) error { return fmt.Errorf("unable to get catalog connection pool: %w", err) } - flowHandler := NewFlowRequestHandler(tc, catalogConn) + taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID) + if err != nil { + return err + } + + flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue) defer flowHandler.Close() - err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace) + err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue) if err != nil { return fmt.Errorf("unable to kill existing heartbeat flows: %w", err) } @@ -126,7 +135,7 @@ func APIMain(args *APIServerParams) error { workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New()) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, - TaskQueue: shared.PeerFlowTaskQueue, + TaskQueue: taskQueue, } _, err = flowHandler.temporalClient.ExecuteWorkflow( diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 13ce8115b1..8cd4951787 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -20,15 +20,17 @@ import ( // grpc server implementation type FlowRequestHandler struct { - temporalClient client.Client - pool *pgxpool.Pool + temporalClient client.Client + pool *pgxpool.Pool + peerflowTaskQueueID string protos.UnimplementedFlowServiceServer } -func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *FlowRequestHandler { +func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool, taskQueue string) *FlowRequestHandler { return &FlowRequestHandler{ - temporalClient: temporalClient, - pool: pool, + temporalClient: temporalClient, + pool: pool, + peerflowTaskQueueID: taskQueue, } } @@ -126,7 +128,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, - TaskQueue: shared.PeerFlowTaskQueue, + TaskQueue: h.peerflowTaskQueueID, } maxBatchSize := int(cfg.MaxBatchSize) @@ -226,7 +228,7 @@ func (h *FlowRequestHandler) CreateQRepFlow( workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, - TaskQueue: shared.PeerFlowTaskQueue, + TaskQueue: h.peerflowTaskQueueID, } if req.CreateCatalogEntry { err := h.createQrepJobEntry(ctx, req, workflowID) @@ -308,7 +310,7 @@ func (h *FlowRequestHandler) ShutdownFlow( workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, - TaskQueue: shared.PeerFlowTaskQueue, + TaskQueue: h.peerflowTaskQueueID, } dropFlowHandle, err := h.temporalClient.ExecuteWorkflow( ctx, // context diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index f969f87a5b..16008cc6a5 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -43,7 +43,12 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { } defer c.Close() - w := worker.New(c, shared.SnapshotFlowTaskQueue, worker.Options{ + taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID) + if queueErr != nil { + return queueErr + } + + w := worker.New(c, taskQueue, worker.Options{ EnableSessionWorker: true, }) w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index c613e2ef67..adba89371e 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -118,7 +118,12 @@ func WorkerMain(opts *WorkerOptions) error { log.Info("Created temporal client") defer c.Close() - w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{}) + taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID) + if queueErr != nil { + return queueErr + } + + w := worker.New(c, taskQueue, worker.Options{}) w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) w.RegisterWorkflow(peerflow.SyncFlowWorkflow) w.RegisterWorkflow(peerflow.SetupFlowWorkflow) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 9df826c32d..d1dfbdd6e1 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -1,8 +1,13 @@ package shared +import ( + "fmt" + "os" +) + const ( - PeerFlowTaskQueue = "peer-flow-task-queue" - SnapshotFlowTaskQueue = "snapshot-flow-task-queue" + peerFlowTaskQueue = "peer-flow-task-queue" + snapshotFlowTaskQueue = "snapshot-flow-task-queue" CDCFlowSignalName = "peer-flow-signal" ) @@ -17,4 +22,23 @@ const ( CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor" ) +type TaskQueueID int64 + +const ( + PeerFlowTaskQueueID TaskQueueID = iota + SnapshotFlowTaskQueueID TaskQueueID = iota +) + const FetchAndChannelSize = 256 * 1024 + +func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) { + deploymentUID := os.Getenv("PEERDB_DEPLOYMENT_UID") + switch taskQueueID { + case PeerFlowTaskQueueID: + return deploymentUID + "-" + peerFlowTaskQueue, nil + case SnapshotFlowTaskQueueID: + return deploymentUID + "-" + snapshotFlowTaskQueue, nil + default: + return "", fmt.Errorf("unknown task queue id %d", taskQueueID) + } +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 753cff3804..bbf134bf09 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -214,13 +214,19 @@ func CDCFlowWorkflowWithConfig( if err != nil { return state, err } + + taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID) + if err != nil { + return state, err + } + childSnapshotFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: snapshotFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: shared.SnapshotFlowTaskQueue, + TaskQueue: taskQueue, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index eb9eac8126..1c08a40fac 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -112,10 +112,15 @@ func (s *SnapshotFlowExecution) cloneTable( }).Infof("Obtained child id %s for source table %s and destination table %s", childWorkflowID, srcName, dstName) + taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID) + if queueErr != nil { + return queueErr + } + childCtx = workflow.WithChildOptions(childCtx, workflow.ChildWorkflowOptions{ WorkflowID: childWorkflowID, WorkflowTaskTimeout: 5 * time.Minute, - TaskQueue: shared.PeerFlowTaskQueue, + TaskQueue: taskQueue, }) // we know that the source is postgres as setup replication output is non-nil