Skip to content

Commit

Permalink
Merge branch 'main' into remove-temporal-testsuite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 1, 2024
2 parents 49995f0 + 54ad0ef commit b7d768e
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 52 deletions.
6 changes: 1 addition & 5 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return err
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
Expand Down
6 changes: 1 addition & 5 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if queueErr != nil {
return nil, nil, queueErr
}

c, err := client.Dial(clientOptions)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
Expand Down
6 changes: 1 addition & 5 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,13 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return nil, nil, queueErr
}

c, err := client.Dial(clientOptions)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
slog.Info("Created temporal client")

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
Expand Down
37 changes: 10 additions & 27 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type (
ContextKey string
TaskQueueID string
)

const (
// Task Queues
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"
PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue"
SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue"

// Queries
CDCFlowStateQuery = "q-cdc-flow-state"
Expand All @@ -22,42 +27,20 @@ const (

const MirrorNameSearchAttribute = "MirrorName"

type (
ContextKey string
)

const (
FlowNameKey ContextKey = "flowName"
PartitionIDKey ContextKey = "partitionId"
DeploymentUIDKey ContextKey = "deploymentUid"
)

type TaskQueueID int64

const (
PeerFlowTaskQueueID TaskQueueID = iota
SnapshotFlowTaskQueueID TaskQueueID = iota
)

const FetchAndChannelSize = 256 * 1024

func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) {
switch taskQueueID {
case PeerFlowTaskQueueID:
return prependUIDToTaskQueueName(peerFlowTaskQueue), nil
case SnapshotFlowTaskQueueID:
return prependUIDToTaskQueueName(snapshotFlowTaskQueue), nil
default:
return "", fmt.Errorf("unknown task queue id %d", taskQueueID)
}
}

func prependUIDToTaskQueueName(taskQueueName string) string {
func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) string {
deploymentUID := peerdbenv.PeerDBDeploymentUID()
if deploymentUID == "" {
return taskQueueName
return string(taskQueueID)
}
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName)
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueID)
}

func GetDeploymentUID() string {
Expand Down
6 changes: 1 addition & 5 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,7 @@ func CDCFlowWorkflow(
// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID)

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if err != nil {
return state, err
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
childSnapshotFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: snapshotFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand Down
6 changes: 1 addition & 5 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ func (s *SnapshotFlowExecution) cloneTable(
s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s",
childWorkflowID, srcName, dstName), cloneLog)

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return queueErr
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: childWorkflowID,
WorkflowTaskTimeout: 5 * time.Minute,
Expand Down

0 comments on commit b7d768e

Please sign in to comment.