From c7a14e25a921f80881e757846442397eff0b4d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 28 Feb 2024 14:37:44 +0000 Subject: [PATCH] Split cdc_flow into cdc_flow / sync_flow (#1365) Benefits: 1. sync_flow / normalize_flow restart when canceled, easy to restart broken flows 2. doesn't keep connection open during pause 3. pause loop moved to start so that upgrades that don't change state values can avoid breaking mirrors by first pausing them to avoid determinism errors in replay 4. moves towards a world where sync_flow history size can be reduced to increase syncs between reconnects (or work out `RecreateSession`) --- flow/e2e/postgres/peer_flow_pg_test.go | 146 +++++---- flow/model/signals.go | 16 + flow/workflows/cdc_flow.go | 418 ++++++++++++------------- flow/workflows/normalize_flow.go | 101 +++--- flow/workflows/register.go | 1 + flow/workflows/snapshot_flow.go | 2 +- flow/workflows/sync_flow.go | 211 +++++++++++++ 7 files changed, 567 insertions(+), 328 deletions(-) create mode 100644 flow/workflows/sync_flow.go diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index c1d4f299bc..6d5d68f8fd 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "go.temporal.io/sdk/testsuite" "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -1111,7 +1112,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst") dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst") sentPause := false - isPaused := false sentUpdate := false _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -1160,13 +1160,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { } getWorkflowState := func() peerflow.CDCFlowWorkflowState { - var workflowState peerflow.CDCFlowWorkflowState + var state peerflow.CDCFlowWorkflowState val, err := env.QueryWorkflow(shared.CDCFlowStateQuery) e2e.EnvNoError(s.t, env, err) - err = val.Get(&workflowState) + err = val.Get(&state) e2e.EnvNoError(s.t, env, err) - return workflowState + return state } getFlowStatus := func() protos.FlowStatus { @@ -1179,15 +1179,61 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { return flowStatus } + var workflowState peerflow.CDCFlowWorkflowState + // signals in tests are weird, you need to register them before starting the workflow // otherwise you guessed it, errors out. really don't like this. // too short of a gap between signals also causes issues // might have something to do with how test workflows handle fast-forwarding time. env.RegisterDelayedCallback(func() { + workflowState = getWorkflowState() e2e.EnvSignalWorkflow(env, model.FlowSignal, model.PauseSignal) s.t.Log("Sent pause signal") sentPause = true }, 28*time.Second) + + // add before to test initial load too. + addRows(18) + go func() { + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + // insert 18 rows into the source tables, exactly 3 batches + addRows(18) + + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + + workflowState = getWorkflowState() + assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1) + // we have limited batch size to 6, so atleast 3 syncs needed + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) + + if !s.t.Failed() { + // wait for first RegisterDelayedCallback to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { + // adding 1 more row while pausing - guarantee finishing another sync + addRows(1) + + return sentPause + }) + } else { + env.CancelWorkflow() + } + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil) + err = env.GetWorkflowError() + if !workflow.IsContinueAsNewError(err) { + require.NoError(s.t, err) + require.Error(s.t, err) + } + workflowState.ActiveSignal = model.PauseSignal + env = e2e.NewTemporalTestWorkflowEnvironment(s.t) + // this signal being sent also unblocks another WaitFor env.RegisterDelayedCallback(func() { e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool { @@ -1195,7 +1241,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { if flowStatus != protos.FlowStatus_STATUS_PAUSED { return false } - isPaused = true e2e.EnvSignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ IdleTimeout: 14, BatchSize: 12, @@ -1210,7 +1255,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { sentUpdate = true return true }) - }, 56*time.Second) + }, 28*time.Second) env.RegisterDelayedCallback(func() { e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { if !sentUpdate { @@ -1220,75 +1265,44 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { s.t.Log("Sent resume signal") return true }) - }, 84*time.Second) + }, 56*time.Second) - // add before to test initial load too. - addRows(18) go func() { - defer env.CancelWorkflow() - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 18 rows into the source tables, exactly 3 batches - addRows(18) - - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil }) - workflowState := getWorkflowState() - assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) - assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) - assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1) - // we have limited batch size to 6, so atleast 3 syncs needed - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) - - if !s.t.Failed() { - // wait for first RegisterDelayedCallback to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { - return sentPause - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - // keep adding 1 more row - guarantee finishing another sync - addRows(1) - // isPaused - set from the WaitFor that sends update signal - return isPaused - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) - - // we have a paused mirror, wait for second signal to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { - return sentUpdate - }) + // we have a paused mirror, wait for second signal to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { + return sentUpdate + }) - // add rows to both tables before resuming - should handle - addRows(18) + // add rows to both tables before resuming - should handle + addRows(18) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { - return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil - }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { + return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil + }) - workflowState = getWorkflowState() - assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) - assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) - assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2) - // 3 from first insert of 18 rows in 1 table - // 1 from pre-pause - // 3 from second insert of 18 rows in 2 tables, batch size updated - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) - } + workflowState = getWorkflowState() + assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2) + // 3 from first insert of 18 rows in 1 table + // 1 from pre-pause + // 3 from second insert of 18 rows in 2 tables, batch size updated + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, &workflowState) e2e.RequireEnvCanceled(s.t, env) } diff --git a/flow/model/signals.go b/flow/model/signals.go index 3a6d536d26..5e30defd63 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -130,6 +130,22 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{ Name: "cdc-dynamic-properties", } +var SyncStopSignal = TypedSignal[struct{}]{ + Name: "sync-stop", +} + +var SyncErrorSignal = TypedSignal[string]{ + Name: "sync-error", +} + +var SyncResultSignal = TypedSignal[SyncResponse]{ + Name: "sync-result", +} + +var SyncOptionsSignal = TypedSignal[*protos.SyncFlowOptions]{ + Name: "sync-options", +} + var NormalizeSignal = TypedSignal[NormalizePayload]{ Name: "normalize", } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 398a09d47c..3404bbfa25 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -21,15 +21,11 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -const ( - maxSyncFlowsPerCDCFlow = 32 -) - type CDCFlowWorkflowState struct { // Progress events for the peer flow. Progress []string // Accumulates status for sync flows spawned. - SyncFlowStatuses []*model.SyncResponse + SyncFlowStatuses []model.SyncResponse // Accumulates status for normalize flows spawned. NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. @@ -57,8 +53,8 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow return &CDCFlowWorkflowState{ Progress: []string{"started"}, // 1 more than the limit of 10 - SyncFlowStatuses: make([]*model.SyncResponse, 0, 11), - NormalizeFlowStatuses: nil, + SyncFlowStatuses: make([]model.SyncResponse, 0, 11), + NormalizeFlowStatuses: make([]model.NormalizeResponse, 0, 11), ActiveSignal: model.NoopSignal, SyncFlowErrors: nil, NormalizeFlowErrors: nil, @@ -102,6 +98,8 @@ func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) { type CDCFlowWorkflowExecution struct { flowExecutionID string logger log.Logger + syncFlowFuture workflow.ChildWorkflowFuture + normFlowFuture workflow.ChildWorkflowFuture } // NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution. @@ -142,6 +140,10 @@ func GetChildWorkflowID( // CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. type CDCFlowWorkflowResult = CDCFlowWorkflowState +const ( + maxSyncsPerCdcFlow = 60 +) + func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, @@ -207,6 +209,54 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return nil } +func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( + ctx workflow.Context, + selector workflow.Selector, + state *CDCFlowWorkflowState, +) { + // add a signal to change CDC properties + cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) + cdcPropertiesSignalChan.AddToSelector(selector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { + // only modify for options since SyncFlow uses it + if cdcConfigUpdate.BatchSize > 0 { + state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize + } + if cdcConfigUpdate.IdleTimeout > 0 { + state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout + } + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } + + if w.syncFlowFuture != nil { + _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) + } + + w.logger.Info("CDC Signal received. Parameters on signal reception:", + slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), + slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + }) +} + +func (w *CDCFlowWorkflowExecution) startSyncFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions) { + w.syncFlowFuture = workflow.ExecuteChildWorkflow( + ctx, + SyncFlowWorkflow, + config, + options, + ) +} + +func (w *CDCFlowWorkflowExecution) startNormFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs) { + w.normFlowFuture = workflow.ExecuteChildWorkflow( + ctx, + NormalizeFlowWorkflow, + config, + nil, + ) +} + func CDCFlowWorkflow( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, @@ -221,6 +271,7 @@ func CDCFlowWorkflow( } w := NewCDCFlowWorkflowExecution(ctx, cfg.FlowJobName) + flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil @@ -241,10 +292,42 @@ func CDCFlowWorkflow( if err != nil { return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err) } + mirrorNameSearch := map[string]interface{}{ shared.MirrorNameSearchAttribute: cfg.FlowJobName, } + if state.ActiveSignal == model.PauseSignal { + selector := workflow.NewNamedSelector(ctx, "PauseLoop") + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) { + state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) + }) + w.addCdcPropertiesSignalListener(ctx, selector, state) + + startTime := workflow.Now(ctx) + state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED + + for state.ActiveSignal == model.PauseSignal { + // only place we block on receive, so signal processing is immediate + for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil { + w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + selector.Select(ctx) + } + if err := ctx.Err(); err != nil { + return state, err + } + + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) + if err != nil { + return state, err + } + } + + w.logger.Info("mirror has been resumed after ", time.Since(startTime)) + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING + } + originalRunID := workflow.GetInfo(ctx).OriginalRunID // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled @@ -356,40 +439,23 @@ func CDCFlowWorkflow( } } - sessionOptions := &workflow.SessionOptions{ - CreationTimeout: 5 * time.Minute, - ExecutionTimeout: 144 * time.Hour, - HeartbeatTimeout: time.Minute, - } - syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) - if err != nil { - return nil, err - } - defer workflow.CompleteSession(syncSessionCtx) - sessionInfo := workflow.GetSessionInfo(syncSessionCtx) - - syncCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 14 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - fMaintain := workflow.ExecuteActivity( - syncCtx, - flowable.MaintainPull, - cfg, - sessionInfo.SessionID, - ) + syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID) + normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) - currentSyncFlowNum := 0 - totalRecordsSynced := int64(0) + var restart, finished bool + syncCount := 0 - var canceled bool - mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") - mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { - canceled = true - }) + syncFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: syncFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, + } + syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts) - normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) normalizeFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: normalizeFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -400,49 +466,94 @@ func CDCFlowWorkflow( WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts) - normalizeFlowFuture := workflow.ExecuteChildWorkflow(normCtx, NormalizeFlowWorkflow, cfg, nil) - var waitSelector workflow.Selector - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) - if !parallel { - waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") - waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { - canceled = true - }) - waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) + handleError := func(name string, err error) { + var panicErr *temporal.PanicError + if errors.As(err, &panicErr) { + w.logger.Error( + "panic in flow", + slog.String("name", name), + slog.Any("error", panicErr.Error()), + slog.String("stack", panicErr.StackTrace()), + ) + } else { + w.logger.Error("error in flow", slog.String("name", name), slog.Any("error", err)) + } } - finishNormalize := func() { - model.NormalizeSignal.SignalChildWorkflow(ctx, normalizeFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }) - if err := normalizeFlowFuture.Get(ctx, nil); err != nil { - w.logger.Error("failed to execute normalize flow", slog.Any("error", err)) - var panicErr *temporal.PanicError - if errors.As(err, &panicErr) { - w.logger.Error("PANIC", panicErr.Error(), panicErr.StackTrace()) - } - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } + finishSyncNormalize := func() { + restart = true + _ = model.SyncStopSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, struct{}{}).Get(ctx, nil) } - mainLoopSelector.AddFuture(fMaintain, func(f workflow.Future) { + mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") + mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + + var handleNormFlow, handleSyncFlow func(workflow.Future) + handleSyncFlow = func(f workflow.Future) { err := f.Get(ctx, nil) if err != nil { - w.logger.Error("MaintainPull failed", slog.Any("error", err)) - canceled = true + handleError("sync", err) + state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } - }) - flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) + if restart { + w.logger.Info("sync finished, finishing normalize") + _ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, model.NormalizePayload{ + Done: true, + SyncBatchID: -1, + }).Get(ctx, nil) + } else { + w.logger.Warn("sync flow ended, restarting", slog.Any("error", err)) + state.TruncateProgress(w.logger) + w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions) + mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow) + } + } + handleNormFlow = func(f workflow.Future) { + err := f.Get(ctx, nil) + if err != nil { + handleError("normalize", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } + + if restart { + w.logger.Info("normalize finished") + finished = true + } else { + w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) + state.TruncateProgress(w.logger) + w.startNormFlow(normCtx, cfg) + mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow) + } + } + + w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions) + mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow) + + w.startNormFlow(normCtx, cfg) + mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow) + flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) }) + syncErrorChan := model.SyncErrorSignal.GetSignalChannel(ctx) + syncErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) { + syncCount += 1 + state.SyncFlowErrors = append(state.SyncFlowErrors, err) + }) + syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx) + syncResultChan.AddToSelector(mainLoopSelector, func(result model.SyncResponse, _ bool) { + syncCount += 1 + if state.SyncFlowOptions.RelationMessageMapping == nil { + state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping + } else { + maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping) + } + state.SyncFlowStatuses = append(state.SyncFlowStatuses, result) + }) + normErrorChan := model.NormalizeErrorSignal.GetSignalChannel(ctx) normErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) { state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err) @@ -453,161 +564,48 @@ func CDCFlowWorkflow( state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, result) }) - // add a signal to change CDC properties - cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) - cdcPropertiesSignalChan.AddToSelector(mainLoopSelector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { - // only modify for options since SyncFlow uses it - if cdcConfigUpdate.BatchSize > 0 { - state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize - } - if cdcConfigUpdate.IdleTimeout > 0 { - state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout - } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } + normChan := model.NormalizeSignal.GetSignalChannel(ctx) + normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) { + _ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, payload).Get(ctx, nil) + maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping) + }) - w.logger.Info("CDC Signal received. Parameters on signal reception:", - slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), - slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), - slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() }) + if !parallel { + normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) + normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { + _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil) + }) + } + w.addCdcPropertiesSignalListener(ctx, mainLoopSelector, state) + + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING for { - for !canceled && mainLoopSelector.HasPending() { + mainLoopSelector.Select(ctx) + for ctx.Err() == nil && mainLoopSelector.HasPending() { mainLoopSelector.Select(ctx) } - if canceled { - break + if err := ctx.Err(); err != nil { + w.logger.Info("mirror canceled", slog.Any("error", err)) + return state, err } - if state.ActiveSignal == model.PauseSignal { - startTime := workflow.Now(ctx) - state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED + if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncsPerCdcFlow { + finishSyncNormalize() + } - for state.ActiveSignal == model.PauseSignal { - w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) - // only place we block on receive, so signal processing is immediate + if restart { + for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { mainLoopSelector.Select(ctx) - if state.ActiveSignal == model.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) - if err != nil { - return state, err - } - } } - - w.logger.Info("mirror has been resumed after ", time.Since(startTime)) - } - - state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING - - // check if total sync flows have been completed - // since this happens immediately after we check for signals, the case of a signal being missed - // due to a new workflow starting is vanishingly low, but possible - if currentSyncFlowNum == maxSyncFlowsPerCDCFlow { - w.logger.Info("All the syncflows have completed successfully, there was a"+ - " limit on the number of syncflows to be executed: ", currentSyncFlowNum) - break - } - currentSyncFlowNum += 1 - w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) - - // execute the sync flow - syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 72 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - - w.logger.Info("executing sync flow") - syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions, sessionInfo.SessionID) - - var syncDone, syncErr bool - mustWait := waitSelector != nil - mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { - syncDone = true - - var childSyncFlowRes *model.SyncResponse - if err := f.Get(ctx, &childSyncFlowRes); err != nil { - w.logger.Error("failed to execute sync flow", slog.Any("error", err)) - state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) - syncErr = true - mustWait = false - } else if childSyncFlowRes != nil { - state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) - state.SyncFlowOptions.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping - totalRecordsSynced += childSyncFlowRes.NumRecordsSynced - w.logger.Info("Total records synced: ", - slog.Int64("totalRecordsSynced", totalRecordsSynced)) - - tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) - - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - if tableSchemaDeltasCount != 0 { - modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) - modifiedDstTables := make([]string, 0, tableSchemaDeltasCount) - for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) - } - - getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, - &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: cfg.Source, - TableIdentifiers: modifiedSrcTables, - FlowName: cfg.FlowJobName, - }) - - var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput - if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { - w.logger.Error("failed to execute schema update at source: ", err) - state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) - } else { - for i, srcTable := range modifiedSrcTables { - dstTable := modifiedDstTables[i] - state.SyncFlowOptions.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] - } - } - } - - err := model.NormalizeSignal.SignalChildWorkflow(ctx, normalizeFlowFuture, model.NormalizePayload{ - Done: false, - SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, - TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping, - }).Get(ctx, nil) - if err != nil { - w.logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err)) - mustWait = false - } - } else { - mustWait = false + if err := ctx.Err(); err != nil { + w.logger.Info("mirror canceled", slog.Any("error", err)) + return state, err } - }) - - for !syncDone && !canceled { - mainLoopSelector.Select(ctx) - } - if canceled { - break - } - if syncErr { - state.TruncateProgress(w.logger) return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } - if mustWait { - waitSelector.Select(ctx) - } - } - - finishNormalize() - state.TruncateProgress(w.logger) - if err := ctx.Err(); err != nil { - return nil, err } - - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 057e4c11a2..5c87b8d05b 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -78,61 +78,60 @@ func NormalizeFlowWorkflow( state.Wait = false }) - for { - for state.Wait && ctx.Err() == nil { - selector.Select(ctx) - } - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() - } + for state.Wait && ctx.Err() == nil { + selector.Select(ctx) + } + if ProcessLoop(ctx, logger, selector, state) { + return ctx.Err() + } - if state.LastSyncBatchID != state.SyncBatchID { - state.LastSyncBatchID = state.SyncBatchID - - logger.Info("executing normalize") - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - TableNameSchemaMapping: state.TableNameSchemaMapping, - SyncBatchID: state.SyncBatchID, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - _ = model.NormalizeErrorSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - err.Error(), - ).Get(ctx, nil) - } else if normalizeResponse != nil { - _ = model.NormalizeResultSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - *normalizeResponse, - ).Get(ctx, nil) - } - } + if state.LastSyncBatchID != state.SyncBatchID { + state.LastSyncBatchID = state.SyncBatchID - if !state.Stop { - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) - - if !parallel { - _ = model.NormalizeDoneSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - struct{}{}, - ).Get(ctx, nil) - } + logger.Info("executing normalize") + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: config, + TableNameSchemaMapping: state.TableNameSchemaMapping, + SyncBatchID: state.SyncBatchID, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + _ = model.NormalizeErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + err.Error(), + ).Get(ctx, nil) + } else if normalizeResponse != nil { + _ = model.NormalizeResultSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + *normalizeResponse, + ).Get(ctx, nil) } + } - state.Wait = true - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() + if !state.Stop { + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() + }) + + if !parallel { + _ = model.NormalizeDoneSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + struct{}{}, + ).Get(ctx, nil) } } + + state.Wait = true + if ProcessLoop(ctx, logger, selector, state) { + return ctx.Err() + } + return workflow.NewContinueAsNewError(ctx, NormalizeFlowWorkflow, config, state) } diff --git a/flow/workflows/register.go b/flow/workflows/register.go index fce5e92989..44c7cb00be 100644 --- a/flow/workflows/register.go +++ b/flow/workflows/register.go @@ -9,6 +9,7 @@ func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry) { w.RegisterWorkflow(DropFlowWorkflow) w.RegisterWorkflow(NormalizeFlowWorkflow) w.RegisterWorkflow(SetupFlowWorkflow) + w.RegisterWorkflow(SyncFlowWorkflow) w.RegisterWorkflow(QRepFlowWorkflow) w.RegisterWorkflow(QRepPartitionWorkflow) w.RegisterWorkflow(XminFlowWorkflow) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 8d2ee29d74..fd8f539083 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -299,7 +299,7 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon var sessionError error var snapshotName string - sessionSelector := workflow.NewNamedSelector(ctx, "Export Snapshot Setup") + sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup") sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { // MaintainTx should never exit without an error before this point sessionError = f.Get(exportCtx, nil) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go new file mode 100644 index 0000000000..87af1d1d82 --- /dev/null +++ b/flow/workflows/sync_flow.go @@ -0,0 +1,211 @@ +package peerflow + +import ( + "log/slog" + "time" + + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared" +) + +const ( + maxSyncsPerSyncFlow = 72 +) + +func SyncFlowWorkflow( + ctx workflow.Context, + config *protos.FlowConnectionConfigs, + options *protos.SyncFlowOptions, +) error { + parent := workflow.GetInfo(ctx).ParentWorkflowExecution + logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) + + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 144 * time.Hour, + HeartbeatTimeout: time.Minute, + } + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return err + } + defer workflow.CompleteSession(syncSessionCtx) + sessionInfo := workflow.GetSessionInfo(syncSessionCtx) + + syncCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain := workflow.ExecuteActivity( + syncCtx, + flowable.MaintainPull, + config, + sessionInfo.SessionID, + ) + + var stop, syncErr bool + currentSyncFlowNum := 0 + totalRecordsSynced := int64(0) + + selector := workflow.NewNamedSelector(ctx, "SyncLoop") + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + selector.AddFuture(fMaintain, func(f workflow.Future) { + err := f.Get(ctx, nil) + if err != nil { + logger.Error("MaintainPull failed", slog.Any("error", err)) + syncErr = true + } + }) + + stopChan := model.SyncStopSignal.GetSignalChannel(ctx) + stopChan.AddToSelector(selector, func(_ struct{}, _ bool) { + stop = true + }) + + var waitSelector workflow.Selector + parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() + }) + if !parallel { + waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") + waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) + waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) + stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) { + stop = true + }) + } + + for !stop && ctx.Err() == nil { + var syncDone bool + mustWait := waitSelector != nil + + // execute the sync flow + currentSyncFlowNum += 1 + logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) + + syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 72 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + + syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionInfo.SessionID) + selector.AddFuture(syncFlowFuture, func(f workflow.Future) { + syncDone = true + + var childSyncFlowRes *model.SyncResponse + if err := f.Get(ctx, &childSyncFlowRes); err != nil { + logger.Error("failed to execute sync flow", slog.Any("error", err)) + _ = model.SyncErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + err.Error(), + ).Get(ctx, nil) + syncErr = true + mustWait = false + } else if childSyncFlowRes != nil { + _ = model.SyncResultSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + *childSyncFlowRes, + ).Get(ctx, nil) + options.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + totalRecordsSynced += childSyncFlowRes.NumRecordsSynced + logger.Info("Total records synced: ", + slog.Int64("totalRecordsSynced", totalRecordsSynced)) + + tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) + + // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. + if tableSchemaDeltasCount != 0 { + modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) + modifiedDstTables := make([]string, 0, tableSchemaDeltasCount) + for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) + } + + getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, + &protos.GetTableSchemaBatchInput{ + PeerConnectionConfig: config.Source, + TableIdentifiers: modifiedSrcTables, + FlowName: config.FlowJobName, + }) + + var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput + if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { + logger.Error("failed to execute schema update at source: ", err) + _ = model.SyncErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + err.Error(), + ).Get(ctx, nil) + } else { + for i, srcTable := range modifiedSrcTables { + dstTable := modifiedDstTables[i] + options.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] + } + } + } + + err := model.NormalizeSignal.SignalExternalWorkflow( + ctx, + parent.ID, + "", + model.NormalizePayload{ + Done: false, + SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, + TableNameSchemaMapping: options.TableNameSchemaMapping, + }, + ).Get(ctx, nil) + if err != nil { + logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err)) + mustWait = false + } + } else { + mustWait = false + } + }) + + for ctx.Err() == nil && (!syncDone || selector.HasPending()) { + selector.Select(ctx) + } + if ctx.Err() != nil { + break + } + + restart := currentSyncFlowNum >= maxSyncsPerSyncFlow || syncErr + if !stop && mustWait { + waitSelector.Select(ctx) + if restart { + // must flush selector for signals received while waiting + for ctx.Err() == nil && selector.HasPending() { + selector.Select(ctx) + } + break + } + } else if restart { + break + } + } + if err := ctx.Err(); err != nil { + logger.Info("sync canceled: %v", err) + return err + } else if stop { + return nil + } + return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options) +}