diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index ec89504fd9..2c0dd3e6cd 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -112,24 +112,23 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context, flowName string) *CDCFlow } } -func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) (T, error) { +func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T { sideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return f(ctx) }) var result T err := sideEffect.Get(&result) - return result, err + if err != nil { + panic(err) + } + return result } -func GetUUID(ctx workflow.Context) (string, error) { - uuid, err := GetSideEffect(ctx, func(_ workflow.Context) string { +func GetUUID(ctx workflow.Context) string { + return GetSideEffect(ctx, func(_ workflow.Context) string { return uuid.New().String() }) - if err != nil { - return "", fmt.Errorf("failed to generate UUID: %w", err) - } - return uuid, nil } func GetChildWorkflowID( @@ -168,15 +167,8 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } - additionalTablesUUID, err := GetUUID(ctx) - if err != nil { - return err - } + additionalTablesUUID := GetUUID(ctx) childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID) - if err != nil { - return err - } - additionalTablesCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesCfg.DoInitialSnapshot = true additionalTablesCfg.InitialSnapshotOnly = true @@ -410,7 +402,7 @@ func CDCFlowWorkflowWithConfig( ) var normWaitChan model.TypedReceiveChannel[struct{}] - parallel, _ := GetSideEffect(ctx, func(_ workflow.Context) bool { + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { return peerdbenv.PeerDBEnableParallelSyncNormalize() }) if !parallel { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index db1321ce32..e854226ff7 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -47,7 +47,7 @@ func NormalizeFlowWorkflow( tableNameSchemaMapping = s.TableNameSchemaMapping }) - parallel, _ := GetSideEffect(ctx, func(_ workflow.Context) bool { + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { return peerdbenv.PeerDBEnableParallelSyncNormalize() }) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 9a1468e6a1..64aa42277d 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -201,24 +201,17 @@ func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context, } // getPartitionWorkflowID returns the child workflow ID for a new sync flow. -func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string, error) { - id, err := GetUUID(ctx) - if err != nil { - return "", fmt.Errorf("failed to get child workflow ID: %w", err) - } - - return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, id), nil +func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) string { + id := GetUUID(ctx) + return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, id) } // startChildWorkflow starts a single child workflow. func (q *QRepFlowExecution) startChildWorkflow( ctx workflow.Context, partitions *protos.QRepPartitionBatch, -) (workflow.ChildWorkflowFuture, error) { - wid, err := q.getPartitionWorkflowID(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get child workflow ID: %w", err) - } +) workflow.ChildWorkflowFuture { + wid := q.getPartitionWorkflowID(ctx) partFlowCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ WorkflowID: wid, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -230,10 +223,7 @@ func (q *QRepFlowExecution) startChildWorkflow( }, }) - future := workflow.ExecuteChildWorkflow( - partFlowCtx, QRepPartitionWorkflow, q.config, partitions, q.runUUID) - - return future, nil + return workflow.ExecuteChildWorkflow(partFlowCtx, QRepPartitionWorkflow, q.config, partitions, q.runUUID) } // processPartitions handles the logic for processing the partitions. @@ -256,11 +246,7 @@ func (q *QRepFlowExecution) processPartitions( Partitions: parts, BatchId: int32(i + 1), } - future, err := q.startChildWorkflow(ctx, batch) - if err != nil { - return fmt.Errorf("failed to start child workflow: %w", err) - } - + future := q.startChildWorkflow(ctx, batch) q.childPartitionWorkflows = append(q.childPartitionWorkflows, future) } diff --git a/flow/workflows/scheduled_flows.go b/flow/workflows/scheduled_flows.go index 7dac9e3023..4cf66d3ff4 100644 --- a/flow/workflows/scheduled_flows.go +++ b/flow/workflows/scheduled_flows.go @@ -6,6 +6,8 @@ import ( "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/peerdbenv" ) // RecordSlotSizeWorkflow monitors replication slot size @@ -46,13 +48,18 @@ func withCronOptions(ctx workflow.Context, workflowID string, cron string) workf func GlobalScheduleManagerWorkflow(ctx workflow.Context) error { info := workflow.GetInfo(ctx) - heartbeatCtx := withCronOptions(ctx, - fmt.Sprintf("wal-heartbeat-%s", info.OriginalRunID), - "*/12 * * * *") - workflow.ExecuteChildWorkflow( - heartbeatCtx, - HeartbeatFlowWorkflow, - ) + walHeartbeatEnabled := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableWALHeartbeat() + }) + if walHeartbeatEnabled { + heartbeatCtx := withCronOptions(ctx, + fmt.Sprintf("wal-heartbeat-%s", info.OriginalRunID), + "*/12 * * * *") + workflow.ExecuteChildWorkflow( + heartbeatCtx, + HeartbeatFlowWorkflow, + ) + } slotSizeCtx := withCronOptions(ctx, fmt.Sprintf("record-slot-size-%s", info.OriginalRunID),