diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index c1d4f299bc..6d5d68f8fd 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "go.temporal.io/sdk/testsuite" "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -1111,7 +1112,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst") dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst") sentPause := false - isPaused := false sentUpdate := false _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -1160,13 +1160,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { } getWorkflowState := func() peerflow.CDCFlowWorkflowState { - var workflowState peerflow.CDCFlowWorkflowState + var state peerflow.CDCFlowWorkflowState val, err := env.QueryWorkflow(shared.CDCFlowStateQuery) e2e.EnvNoError(s.t, env, err) - err = val.Get(&workflowState) + err = val.Get(&state) e2e.EnvNoError(s.t, env, err) - return workflowState + return state } getFlowStatus := func() protos.FlowStatus { @@ -1179,15 +1179,61 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { return flowStatus } + var workflowState peerflow.CDCFlowWorkflowState + // signals in tests are weird, you need to register them before starting the workflow // otherwise you guessed it, errors out. really don't like this. // too short of a gap between signals also causes issues // might have something to do with how test workflows handle fast-forwarding time. env.RegisterDelayedCallback(func() { + workflowState = getWorkflowState() e2e.EnvSignalWorkflow(env, model.FlowSignal, model.PauseSignal) s.t.Log("Sent pause signal") sentPause = true }, 28*time.Second) + + // add before to test initial load too. + addRows(18) + go func() { + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + // insert 18 rows into the source tables, exactly 3 batches + addRows(18) + + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + + workflowState = getWorkflowState() + assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1) + // we have limited batch size to 6, so atleast 3 syncs needed + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) + + if !s.t.Failed() { + // wait for first RegisterDelayedCallback to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { + // adding 1 more row while pausing - guarantee finishing another sync + addRows(1) + + return sentPause + }) + } else { + env.CancelWorkflow() + } + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil) + err = env.GetWorkflowError() + if !workflow.IsContinueAsNewError(err) { + require.NoError(s.t, err) + require.Error(s.t, err) + } + workflowState.ActiveSignal = model.PauseSignal + env = e2e.NewTemporalTestWorkflowEnvironment(s.t) + // this signal being sent also unblocks another WaitFor env.RegisterDelayedCallback(func() { e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool { @@ -1195,7 +1241,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { if flowStatus != protos.FlowStatus_STATUS_PAUSED { return false } - isPaused = true e2e.EnvSignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ IdleTimeout: 14, BatchSize: 12, @@ -1210,7 +1255,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { sentUpdate = true return true }) - }, 56*time.Second) + }, 28*time.Second) env.RegisterDelayedCallback(func() { e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { if !sentUpdate { @@ -1220,75 +1265,44 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { s.t.Log("Sent resume signal") return true }) - }, 84*time.Second) + }, 56*time.Second) - // add before to test initial load too. - addRows(18) go func() { - defer env.CancelWorkflow() - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 18 rows into the source tables, exactly 3 batches - addRows(18) - - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil }) - workflowState := getWorkflowState() - assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) - assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) - assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1) - // we have limited batch size to 6, so atleast 3 syncs needed - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) - - if !s.t.Failed() { - // wait for first RegisterDelayedCallback to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { - return sentPause - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - // keep adding 1 more row - guarantee finishing another sync - addRows(1) - // isPaused - set from the WaitFor that sends update signal - return isPaused - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) - - // we have a paused mirror, wait for second signal to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { - return sentUpdate - }) + // we have a paused mirror, wait for second signal to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { + return sentUpdate + }) - // add rows to both tables before resuming - should handle - addRows(18) + // add rows to both tables before resuming - should handle + addRows(18) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { - return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil - }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { + return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil + }) - workflowState = getWorkflowState() - assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) - assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) - assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2) - // 3 from first insert of 18 rows in 1 table - // 1 from pre-pause - // 3 from second insert of 18 rows in 2 tables, batch size updated - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) - } + workflowState = getWorkflowState() + assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2) + // 3 from first insert of 18 rows in 1 table + // 1 from pre-pause + // 3 from second insert of 18 rows in 2 tables, batch size updated + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, &workflowState) e2e.RequireEnvCanceled(s.t, env) } diff --git a/flow/model/signals.go b/flow/model/signals.go index 3a6d536d26..5e30defd63 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -130,6 +130,22 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{ Name: "cdc-dynamic-properties", } +var SyncStopSignal = TypedSignal[struct{}]{ + Name: "sync-stop", +} + +var SyncErrorSignal = TypedSignal[string]{ + Name: "sync-error", +} + +var SyncResultSignal = TypedSignal[SyncResponse]{ + Name: "sync-result", +} + +var SyncOptionsSignal = TypedSignal[*protos.SyncFlowOptions]{ + Name: "sync-options", +} + var NormalizeSignal = TypedSignal[NormalizePayload]{ Name: "normalize", } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 398a09d47c..3404bbfa25 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -21,15 +21,11 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -const ( - maxSyncFlowsPerCDCFlow = 32 -) - type CDCFlowWorkflowState struct { // Progress events for the peer flow. Progress []string // Accumulates status for sync flows spawned. - SyncFlowStatuses []*model.SyncResponse + SyncFlowStatuses []model.SyncResponse // Accumulates status for normalize flows spawned. NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. @@ -57,8 +53,8 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow return &CDCFlowWorkflowState{ Progress: []string{"started"}, // 1 more than the limit of 10 - SyncFlowStatuses: make([]*model.SyncResponse, 0, 11), - NormalizeFlowStatuses: nil, + SyncFlowStatuses: make([]model.SyncResponse, 0, 11), + NormalizeFlowStatuses: make([]model.NormalizeResponse, 0, 11), ActiveSignal: model.NoopSignal, SyncFlowErrors: nil, NormalizeFlowErrors: nil, @@ -102,6 +98,8 @@ func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) { type CDCFlowWorkflowExecution struct { flowExecutionID string logger log.Logger + syncFlowFuture workflow.ChildWorkflowFuture + normFlowFuture workflow.ChildWorkflowFuture } // NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution. @@ -142,6 +140,10 @@ func GetChildWorkflowID( // CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. type CDCFlowWorkflowResult = CDCFlowWorkflowState +const ( + maxSyncsPerCdcFlow = 60 +) + func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, @@ -207,6 +209,54 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return nil } +func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( + ctx workflow.Context, + selector workflow.Selector, + state *CDCFlowWorkflowState, +) { + // add a signal to change CDC properties + cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) + cdcPropertiesSignalChan.AddToSelector(selector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { + // only modify for options since SyncFlow uses it + if cdcConfigUpdate.BatchSize > 0 { + state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize + } + if cdcConfigUpdate.IdleTimeout > 0 { + state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout + } + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } + + if w.syncFlowFuture != nil { + _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) + } + + w.logger.Info("CDC Signal received. Parameters on signal reception:", + slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), + slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + }) +} + +func (w *CDCFlowWorkflowExecution) startSyncFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions) { + w.syncFlowFuture = workflow.ExecuteChildWorkflow( + ctx, + SyncFlowWorkflow, + config, + options, + ) +} + +func (w *CDCFlowWorkflowExecution) startNormFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs) { + w.normFlowFuture = workflow.ExecuteChildWorkflow( + ctx, + NormalizeFlowWorkflow, + config, + nil, + ) +} + func CDCFlowWorkflow( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, @@ -221,6 +271,7 @@ func CDCFlowWorkflow( } w := NewCDCFlowWorkflowExecution(ctx, cfg.FlowJobName) + flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil @@ -241,10 +292,42 @@ func CDCFlowWorkflow( if err != nil { return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err) } + mirrorNameSearch := map[string]interface{}{ shared.MirrorNameSearchAttribute: cfg.FlowJobName, } + if state.ActiveSignal == model.PauseSignal { + selector := workflow.NewNamedSelector(ctx, "PauseLoop") + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) { + state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) + }) + w.addCdcPropertiesSignalListener(ctx, selector, state) + + startTime := workflow.Now(ctx) + state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED + + for state.ActiveSignal == model.PauseSignal { + // only place we block on receive, so signal processing is immediate + for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil { + w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + selector.Select(ctx) + } + if err := ctx.Err(); err != nil { + return state, err + } + + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) + if err != nil { + return state, err + } + } + + w.logger.Info("mirror has been resumed after ", time.Since(startTime)) + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING + } + originalRunID := workflow.GetInfo(ctx).OriginalRunID // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled @@ -356,40 +439,23 @@ func CDCFlowWorkflow( } } - sessionOptions := &workflow.SessionOptions{ - CreationTimeout: 5 * time.Minute, - ExecutionTimeout: 144 * time.Hour, - HeartbeatTimeout: time.Minute, - } - syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) - if err != nil { - return nil, err - } - defer workflow.CompleteSession(syncSessionCtx) - sessionInfo := workflow.GetSessionInfo(syncSessionCtx) - - syncCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 14 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - fMaintain := workflow.ExecuteActivity( - syncCtx, - flowable.MaintainPull, - cfg, - sessionInfo.SessionID, - ) + syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID) + normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) - currentSyncFlowNum := 0 - totalRecordsSynced := int64(0) + var restart, finished bool + syncCount := 0 - var canceled bool - mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") - mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { - canceled = true - }) + syncFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: syncFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, + } + syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts) - normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) normalizeFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: normalizeFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -400,49 +466,94 @@ func CDCFlowWorkflow( WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts) - normalizeFlowFuture := workflow.ExecuteChildWorkflow(normCtx, NormalizeFlowWorkflow, cfg, nil) - var waitSelector workflow.Selector - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) - if !parallel { - waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") - waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { - canceled = true - }) - waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) + handleError := func(name string, err error) { + var panicErr *temporal.PanicError + if errors.As(err, &panicErr) { + w.logger.Error( + "panic in flow", + slog.String("name", name), + slog.Any("error", panicErr.Error()), + slog.String("stack", panicErr.StackTrace()), + ) + } else { + w.logger.Error("error in flow", slog.String("name", name), slog.Any("error", err)) + } } - finishNormalize := func() { - model.NormalizeSignal.SignalChildWorkflow(ctx, normalizeFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }) - if err := normalizeFlowFuture.Get(ctx, nil); err != nil { - w.logger.Error("failed to execute normalize flow", slog.Any("error", err)) - var panicErr *temporal.PanicError - if errors.As(err, &panicErr) { - w.logger.Error("PANIC", panicErr.Error(), panicErr.StackTrace()) - } - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } + finishSyncNormalize := func() { + restart = true + _ = model.SyncStopSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, struct{}{}).Get(ctx, nil) } - mainLoopSelector.AddFuture(fMaintain, func(f workflow.Future) { + mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") + mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + + var handleNormFlow, handleSyncFlow func(workflow.Future) + handleSyncFlow = func(f workflow.Future) { err := f.Get(ctx, nil) if err != nil { - w.logger.Error("MaintainPull failed", slog.Any("error", err)) - canceled = true + handleError("sync", err) + state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } - }) - flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) + if restart { + w.logger.Info("sync finished, finishing normalize") + _ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, model.NormalizePayload{ + Done: true, + SyncBatchID: -1, + }).Get(ctx, nil) + } else { + w.logger.Warn("sync flow ended, restarting", slog.Any("error", err)) + state.TruncateProgress(w.logger) + w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions) + mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow) + } + } + handleNormFlow = func(f workflow.Future) { + err := f.Get(ctx, nil) + if err != nil { + handleError("normalize", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } + + if restart { + w.logger.Info("normalize finished") + finished = true + } else { + w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) + state.TruncateProgress(w.logger) + w.startNormFlow(normCtx, cfg) + mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow) + } + } + + w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions) + mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow) + + w.startNormFlow(normCtx, cfg) + mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow) + flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) }) + syncErrorChan := model.SyncErrorSignal.GetSignalChannel(ctx) + syncErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) { + syncCount += 1 + state.SyncFlowErrors = append(state.SyncFlowErrors, err) + }) + syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx) + syncResultChan.AddToSelector(mainLoopSelector, func(result model.SyncResponse, _ bool) { + syncCount += 1 + if state.SyncFlowOptions.RelationMessageMapping == nil { + state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping + } else { + maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping) + } + state.SyncFlowStatuses = append(state.SyncFlowStatuses, result) + }) + normErrorChan := model.NormalizeErrorSignal.GetSignalChannel(ctx) normErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) { state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err) @@ -453,161 +564,48 @@ func CDCFlowWorkflow( state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, result) }) - // add a signal to change CDC properties - cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) - cdcPropertiesSignalChan.AddToSelector(mainLoopSelector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { - // only modify for options since SyncFlow uses it - if cdcConfigUpdate.BatchSize > 0 { - state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize - } - if cdcConfigUpdate.IdleTimeout > 0 { - state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout - } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } + normChan := model.NormalizeSignal.GetSignalChannel(ctx) + normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) { + _ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, payload).Get(ctx, nil) + maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping) + }) - w.logger.Info("CDC Signal received. Parameters on signal reception:", - slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), - slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), - slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() }) + if !parallel { + normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) + normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { + _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil) + }) + } + w.addCdcPropertiesSignalListener(ctx, mainLoopSelector, state) + + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING for { - for !canceled && mainLoopSelector.HasPending() { + mainLoopSelector.Select(ctx) + for ctx.Err() == nil && mainLoopSelector.HasPending() { mainLoopSelector.Select(ctx) } - if canceled { - break + if err := ctx.Err(); err != nil { + w.logger.Info("mirror canceled", slog.Any("error", err)) + return state, err } - if state.ActiveSignal == model.PauseSignal { - startTime := workflow.Now(ctx) - state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED + if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncsPerCdcFlow { + finishSyncNormalize() + } - for state.ActiveSignal == model.PauseSignal { - w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) - // only place we block on receive, so signal processing is immediate + if restart { + for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { mainLoopSelector.Select(ctx) - if state.ActiveSignal == model.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) - if err != nil { - return state, err - } - } } - - w.logger.Info("mirror has been resumed after ", time.Since(startTime)) - } - - state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING - - // check if total sync flows have been completed - // since this happens immediately after we check for signals, the case of a signal being missed - // due to a new workflow starting is vanishingly low, but possible - if currentSyncFlowNum == maxSyncFlowsPerCDCFlow { - w.logger.Info("All the syncflows have completed successfully, there was a"+ - " limit on the number of syncflows to be executed: ", currentSyncFlowNum) - break - } - currentSyncFlowNum += 1 - w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) - - // execute the sync flow - syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 72 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - - w.logger.Info("executing sync flow") - syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions, sessionInfo.SessionID) - - var syncDone, syncErr bool - mustWait := waitSelector != nil - mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { - syncDone = true - - var childSyncFlowRes *model.SyncResponse - if err := f.Get(ctx, &childSyncFlowRes); err != nil { - w.logger.Error("failed to execute sync flow", slog.Any("error", err)) - state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) - syncErr = true - mustWait = false - } else if childSyncFlowRes != nil { - state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) - state.SyncFlowOptions.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping - totalRecordsSynced += childSyncFlowRes.NumRecordsSynced - w.logger.Info("Total records synced: ", - slog.Int64("totalRecordsSynced", totalRecordsSynced)) - - tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) - - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - if tableSchemaDeltasCount != 0 { - modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) - modifiedDstTables := make([]string, 0, tableSchemaDeltasCount) - for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) - } - - getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, - &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: cfg.Source, - TableIdentifiers: modifiedSrcTables, - FlowName: cfg.FlowJobName, - }) - - var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput - if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { - w.logger.Error("failed to execute schema update at source: ", err) - state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) - } else { - for i, srcTable := range modifiedSrcTables { - dstTable := modifiedDstTables[i] - state.SyncFlowOptions.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] - } - } - } - - err := model.NormalizeSignal.SignalChildWorkflow(ctx, normalizeFlowFuture, model.NormalizePayload{ - Done: false, - SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, - TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping, - }).Get(ctx, nil) - if err != nil { - w.logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err)) - mustWait = false - } - } else { - mustWait = false + if err := ctx.Err(); err != nil { + w.logger.Info("mirror canceled", slog.Any("error", err)) + return state, err } - }) - - for !syncDone && !canceled { - mainLoopSelector.Select(ctx) - } - if canceled { - break - } - if syncErr { - state.TruncateProgress(w.logger) return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } - if mustWait { - waitSelector.Select(ctx) - } - } - - finishNormalize() - state.TruncateProgress(w.logger) - if err := ctx.Err(); err != nil { - return nil, err } - - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 057e4c11a2..5c87b8d05b 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -78,61 +78,60 @@ func NormalizeFlowWorkflow( state.Wait = false }) - for { - for state.Wait && ctx.Err() == nil { - selector.Select(ctx) - } - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() - } + for state.Wait && ctx.Err() == nil { + selector.Select(ctx) + } + if ProcessLoop(ctx, logger, selector, state) { + return ctx.Err() + } - if state.LastSyncBatchID != state.SyncBatchID { - state.LastSyncBatchID = state.SyncBatchID - - logger.Info("executing normalize") - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - TableNameSchemaMapping: state.TableNameSchemaMapping, - SyncBatchID: state.SyncBatchID, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - _ = model.NormalizeErrorSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - err.Error(), - ).Get(ctx, nil) - } else if normalizeResponse != nil { - _ = model.NormalizeResultSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - *normalizeResponse, - ).Get(ctx, nil) - } - } + if state.LastSyncBatchID != state.SyncBatchID { + state.LastSyncBatchID = state.SyncBatchID - if !state.Stop { - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) - - if !parallel { - _ = model.NormalizeDoneSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - struct{}{}, - ).Get(ctx, nil) - } + logger.Info("executing normalize") + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: config, + TableNameSchemaMapping: state.TableNameSchemaMapping, + SyncBatchID: state.SyncBatchID, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + _ = model.NormalizeErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + err.Error(), + ).Get(ctx, nil) + } else if normalizeResponse != nil { + _ = model.NormalizeResultSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + *normalizeResponse, + ).Get(ctx, nil) } + } - state.Wait = true - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() + if !state.Stop { + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() + }) + + if !parallel { + _ = model.NormalizeDoneSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + struct{}{}, + ).Get(ctx, nil) } } + + state.Wait = true + if ProcessLoop(ctx, logger, selector, state) { + return ctx.Err() + } + return workflow.NewContinueAsNewError(ctx, NormalizeFlowWorkflow, config, state) } diff --git a/flow/workflows/register.go b/flow/workflows/register.go index fce5e92989..44c7cb00be 100644 --- a/flow/workflows/register.go +++ b/flow/workflows/register.go @@ -9,6 +9,7 @@ func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry) { w.RegisterWorkflow(DropFlowWorkflow) w.RegisterWorkflow(NormalizeFlowWorkflow) w.RegisterWorkflow(SetupFlowWorkflow) + w.RegisterWorkflow(SyncFlowWorkflow) w.RegisterWorkflow(QRepFlowWorkflow) w.RegisterWorkflow(QRepPartitionWorkflow) w.RegisterWorkflow(XminFlowWorkflow) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 8d2ee29d74..fd8f539083 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -299,7 +299,7 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon var sessionError error var snapshotName string - sessionSelector := workflow.NewNamedSelector(ctx, "Export Snapshot Setup") + sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup") sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { // MaintainTx should never exit without an error before this point sessionError = f.Get(exportCtx, nil) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go new file mode 100644 index 0000000000..87af1d1d82 --- /dev/null +++ b/flow/workflows/sync_flow.go @@ -0,0 +1,211 @@ +package peerflow + +import ( + "log/slog" + "time" + + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared" +) + +const ( + maxSyncsPerSyncFlow = 72 +) + +func SyncFlowWorkflow( + ctx workflow.Context, + config *protos.FlowConnectionConfigs, + options *protos.SyncFlowOptions, +) error { + parent := workflow.GetInfo(ctx).ParentWorkflowExecution + logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) + + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 144 * time.Hour, + HeartbeatTimeout: time.Minute, + } + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return err + } + defer workflow.CompleteSession(syncSessionCtx) + sessionInfo := workflow.GetSessionInfo(syncSessionCtx) + + syncCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain := workflow.ExecuteActivity( + syncCtx, + flowable.MaintainPull, + config, + sessionInfo.SessionID, + ) + + var stop, syncErr bool + currentSyncFlowNum := 0 + totalRecordsSynced := int64(0) + + selector := workflow.NewNamedSelector(ctx, "SyncLoop") + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + selector.AddFuture(fMaintain, func(f workflow.Future) { + err := f.Get(ctx, nil) + if err != nil { + logger.Error("MaintainPull failed", slog.Any("error", err)) + syncErr = true + } + }) + + stopChan := model.SyncStopSignal.GetSignalChannel(ctx) + stopChan.AddToSelector(selector, func(_ struct{}, _ bool) { + stop = true + }) + + var waitSelector workflow.Selector + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() + }) + if !parallel { + waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") + waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) + waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) + stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) { + stop = true + }) + } + + for !stop && ctx.Err() == nil { + var syncDone bool + mustWait := waitSelector != nil + + // execute the sync flow + currentSyncFlowNum += 1 + logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) + + syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 72 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + + syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionInfo.SessionID) + selector.AddFuture(syncFlowFuture, func(f workflow.Future) { + syncDone = true + + var childSyncFlowRes *model.SyncResponse + if err := f.Get(ctx, &childSyncFlowRes); err != nil { + logger.Error("failed to execute sync flow", slog.Any("error", err)) + _ = model.SyncErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + err.Error(), + ).Get(ctx, nil) + syncErr = true + mustWait = false + } else if childSyncFlowRes != nil { + _ = model.SyncResultSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + *childSyncFlowRes, + ).Get(ctx, nil) + options.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + totalRecordsSynced += childSyncFlowRes.NumRecordsSynced + logger.Info("Total records synced: ", + slog.Int64("totalRecordsSynced", totalRecordsSynced)) + + tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) + + // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. + if tableSchemaDeltasCount != 0 { + modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) + modifiedDstTables := make([]string, 0, tableSchemaDeltasCount) + for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) + } + + getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, + &protos.GetTableSchemaBatchInput{ + PeerConnectionConfig: config.Source, + TableIdentifiers: modifiedSrcTables, + FlowName: config.FlowJobName, + }) + + var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput + if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { + logger.Error("failed to execute schema update at source: ", err) + _ = model.SyncErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + err.Error(), + ).Get(ctx, nil) + } else { + for i, srcTable := range modifiedSrcTables { + dstTable := modifiedDstTables[i] + options.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] + } + } + } + + err := model.NormalizeSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + model.NormalizePayload{ + Done: false, + SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, + TableNameSchemaMapping: options.TableNameSchemaMapping, + }, + ).Get(ctx, nil) + if err != nil { + logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err)) + mustWait = false + } + } else { + mustWait = false + } + }) + + for ctx.Err() == nil && (!syncDone || selector.HasPending()) { + selector.Select(ctx) + } + if ctx.Err() != nil { + break + } + + restart := currentSyncFlowNum >= maxSyncsPerSyncFlow || syncErr + if !stop && mustWait { + waitSelector.Select(ctx) + if restart { + // must flush selector for signals received while waiting + for ctx.Err() == nil && selector.HasPending() { + selector.Select(ctx) + } + break + } + } else if restart { + break + } + } + if err := ctx.Err(); err != nil { + logger.Info("sync canceled: %v", err) + return err + } else if stop { + return nil + } + return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options) +}