diff --git a/flow/cmd/api.go b/flow/cmd/api.go index ae64b372be..f24178efda 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -124,11 +124,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error { return fmt.Errorf("unable to get catalog connection pool: %w", err) } - taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID) - if err != nil { - return err - } - + taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue) flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue) err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue) diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index bc53785382..89680f51d9 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -53,11 +53,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { } defer c.Close() - taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID) - if queueErr != nil { - return queueErr - } - + taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue) w := worker.New(c, taskQueue, worker.Options{ EnableSessionWorker: true, }) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index ee9218a9da..c43515dc98 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -123,11 +123,7 @@ func WorkerMain(opts *WorkerOptions) error { slog.Info("Created temporal client") defer c.Close() - taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID) - if queueErr != nil { - return queueErr - } - + taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue) w := worker.New(c, taskQueue, worker.Options{ EnableSessionWorker: true, }) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index fe8320b446..e6e982c321 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -6,10 +6,15 @@ import ( "github.com/PeerDB-io/peer-flow/peerdbenv" ) +type ( + ContextKey string + TaskQueueID string +) + const ( // Task Queues - peerFlowTaskQueue = "peer-flow-task-queue" - snapshotFlowTaskQueue = "snapshot-flow-task-queue" + PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue" + SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue" // Queries CDCFlowStateQuery = "q-cdc-flow-state" @@ -22,42 +27,20 @@ const ( const MirrorNameSearchAttribute = "MirrorName" -type ( - ContextKey string -) - const ( FlowNameKey ContextKey = "flowName" PartitionIDKey ContextKey = "partitionId" DeploymentUIDKey ContextKey = "deploymentUid" ) -type TaskQueueID int64 - -const ( - PeerFlowTaskQueueID TaskQueueID = iota - SnapshotFlowTaskQueueID TaskQueueID = iota -) - const FetchAndChannelSize = 256 * 1024 -func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) { - switch taskQueueID { - case PeerFlowTaskQueueID: - return prependUIDToTaskQueueName(peerFlowTaskQueue), nil - case SnapshotFlowTaskQueueID: - return prependUIDToTaskQueueName(snapshotFlowTaskQueue), nil - default: - return "", fmt.Errorf("unknown task queue id %d", taskQueueID) - } -} - -func prependUIDToTaskQueueName(taskQueueName string) string { +func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) string { deploymentUID := peerdbenv.PeerDBDeploymentUID() if deploymentUID == "" { - return taskQueueName + return string(taskQueueID) } - return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName) + return fmt.Sprintf("%s-%s", deploymentUID, taskQueueID) } func GetDeploymentUID() string { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c9825d4e80..e0629d5233 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -371,11 +371,7 @@ func CDCFlowWorkflow( // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID) - taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID) - if err != nil { - return state, err - } - + taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue) childSnapshotFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: snapshotFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index fd8f539083..a42110f769 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -105,11 +105,7 @@ func (s *SnapshotFlowExecution) cloneTable( s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s", childWorkflowID, srcName, dstName), cloneLog) - taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID) - if queueErr != nil { - return queueErr - } - + taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue) childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ WorkflowID: childWorkflowID, WorkflowTaskTimeout: 5 * time.Minute,