Skip to content

Commit

Permalink
Remove integer TaskQueueID (#1419)
Browse files Browse the repository at this point in the history
Removes need for error check
Also previously both task queues had same iota value, so that was wrong
  • Loading branch information
serprex authored Mar 1, 2024
1 parent 3f3ff9b commit 54ad0ef
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 52 deletions.
6 changes: 1 addition & 5 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return err
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
Expand Down
6 changes: 1 addition & 5 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}
defer c.Close()

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if queueErr != nil {
return queueErr
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
})
Expand Down
6 changes: 1 addition & 5 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ func WorkerMain(opts *WorkerOptions) error {
slog.Info("Created temporal client")
defer c.Close()

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return queueErr
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
})
Expand Down
37 changes: 10 additions & 27 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type (
ContextKey string
TaskQueueID string
)

const (
// Task Queues
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"
PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue"
SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue"

// Queries
CDCFlowStateQuery = "q-cdc-flow-state"
Expand All @@ -22,42 +27,20 @@ const (

const MirrorNameSearchAttribute = "MirrorName"

type (
ContextKey string
)

const (
FlowNameKey ContextKey = "flowName"
PartitionIDKey ContextKey = "partitionId"
DeploymentUIDKey ContextKey = "deploymentUid"
)

type TaskQueueID int64

const (
PeerFlowTaskQueueID TaskQueueID = iota
SnapshotFlowTaskQueueID TaskQueueID = iota
)

const FetchAndChannelSize = 256 * 1024

func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) {
switch taskQueueID {
case PeerFlowTaskQueueID:
return prependUIDToTaskQueueName(peerFlowTaskQueue), nil
case SnapshotFlowTaskQueueID:
return prependUIDToTaskQueueName(snapshotFlowTaskQueue), nil
default:
return "", fmt.Errorf("unknown task queue id %d", taskQueueID)
}
}

func prependUIDToTaskQueueName(taskQueueName string) string {
func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) string {
deploymentUID := peerdbenv.PeerDBDeploymentUID()
if deploymentUID == "" {
return taskQueueName
return string(taskQueueID)
}
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName)
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueID)
}

func GetDeploymentUID() string {
Expand Down
6 changes: 1 addition & 5 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,7 @@ func CDCFlowWorkflow(
// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID)

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if err != nil {
return state, err
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
childSnapshotFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: snapshotFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand Down
6 changes: 1 addition & 5 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ func (s *SnapshotFlowExecution) cloneTable(
s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s",
childWorkflowID, srcName, dstName), cloneLog)

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return queueErr
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: childWorkflowID,
WorkflowTaskTimeout: 5 * time.Minute,
Expand Down

0 comments on commit 54ad0ef

Please sign in to comment.