diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index a57613edda..0ec31befe4 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1209,8 +1209,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1) - // we have limited batch size to 6, so atleast 3 syncs needed - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) if !s.t.Failed() { // wait for first RegisterDelayedCallback to hit. @@ -1283,9 +1281,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2) - // 3 from first insert of 18 rows in 1 table - // TODO 3 from second insert of 18 rows in 2 tables, batch size updated - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 4) env.CancelWorkflow() }() diff --git a/flow/model/signals.go b/flow/model/signals.go index 5e30defd63..45ec805339 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -134,11 +134,7 @@ var SyncStopSignal = TypedSignal[struct{}]{ Name: "sync-stop", } -var SyncErrorSignal = TypedSignal[string]{ - Name: "sync-error", -} - -var SyncResultSignal = TypedSignal[SyncResponse]{ +var SyncResultSignal = TypedSignal[*SyncResponse]{ Name: "sync-result", } @@ -150,14 +146,6 @@ var NormalizeSignal = TypedSignal[NormalizePayload]{ Name: "normalize", } -var NormalizeErrorSignal = TypedSignal[string]{ - Name: "normalize-error", -} - -var NormalizeResultSignal = TypedSignal[NormalizeResponse]{ - Name: "normalize-result", -} - var NormalizeDoneSignal = TypedSignal[struct{}]{ Name: "normalize-done", } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1891d7f6c1..419c2df3db 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -22,18 +22,8 @@ import ( ) type CDCFlowWorkflowState struct { - // Progress events for the peer flow. - Progress []string - // Accumulates status for sync flows spawned. - SyncFlowStatuses []model.SyncResponse - // Accumulates status for normalize flows spawned. - NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. ActiveSignal model.CDCFlowSignal - // Errors encountered during child sync flow executions. - SyncFlowErrors []string - // Errors encountered during child sync flow executions. - NormalizeFlowErrors []string // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping @@ -51,15 +41,10 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) } return &CDCFlowWorkflowState{ - Progress: []string{"started"}, // 1 more than the limit of 10 - SyncFlowStatuses: make([]model.SyncResponse, 0, 11), - NormalizeFlowStatuses: make([]model.NormalizeResponse, 0, 11), - ActiveSignal: model.NoopSignal, - SyncFlowErrors: nil, - NormalizeFlowErrors: nil, - CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, - FlowConfigUpdate: nil, + ActiveSignal: model.NoopSignal, + CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, + FlowConfigUpdate: nil, SyncFlowOptions: &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, @@ -68,32 +53,6 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow } } -// truncate the progress and other arrays to a max of 10 elements -func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) { - if len(s.Progress) > 10 { - copy(s.Progress, s.Progress[len(s.Progress)-10:]) - s.Progress = s.Progress[:10] - } - if len(s.SyncFlowStatuses) > 10 { - copy(s.SyncFlowStatuses, s.SyncFlowStatuses[len(s.SyncFlowStatuses)-10:]) - s.SyncFlowStatuses = s.SyncFlowStatuses[:10] - } - if len(s.NormalizeFlowStatuses) > 10 { - copy(s.NormalizeFlowStatuses, s.NormalizeFlowStatuses[len(s.NormalizeFlowStatuses)-10:]) - s.NormalizeFlowStatuses = s.NormalizeFlowStatuses[:10] - } - - if s.SyncFlowErrors != nil { - logger.Warn("SyncFlowErrors", slog.Any("errors", s.SyncFlowErrors)) - s.SyncFlowErrors = nil - } - - if s.NormalizeFlowErrors != nil { - logger.Warn("NormalizeFlowErrors", slog.Any("errors", s.NormalizeFlowErrors)) - s.NormalizeFlowErrors = nil - } -} - // CDCFlowWorkflowExecution represents the state for execution of a peer flow. type CDCFlowWorkflowExecution struct { flowExecutionID string @@ -429,7 +388,7 @@ func CDCFlowWorkflow( } state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING - state.Progress = append(state.Progress, "executed setup flow and snapshot flow") + w.logger.Info("executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. if cfg.InitialSnapshotOnly { @@ -492,7 +451,6 @@ func CDCFlowWorkflow( err := f.Get(ctx, nil) if err != nil { handleError("sync", err) - state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } if restart { @@ -504,7 +462,6 @@ func CDCFlowWorkflow( }).Get(ctx, nil) } else { w.logger.Warn("sync flow ended, restarting", slog.Any("error", err)) - state.TruncateProgress(w.logger) w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions) mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow) } @@ -513,7 +470,6 @@ func CDCFlowWorkflow( err := f.Get(ctx, nil) if err != nil { handleError("normalize", err) - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) } if restart { @@ -522,7 +478,6 @@ func CDCFlowWorkflow( finished = true } else { w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) - state.TruncateProgress(w.logger) w.startNormFlow(normCtx, cfg) mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow) } @@ -538,30 +493,16 @@ func CDCFlowWorkflow( state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) }) - syncErrorChan := model.SyncErrorSignal.GetSignalChannel(ctx) - syncErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) { - syncCount += 1 - state.SyncFlowErrors = append(state.SyncFlowErrors, err) - }) syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx) - syncResultChan.AddToSelector(mainLoopSelector, func(result model.SyncResponse, _ bool) { + syncResultChan.AddToSelector(mainLoopSelector, func(result *model.SyncResponse, _ bool) { syncCount += 1 - if state.SyncFlowOptions.RelationMessageMapping == nil { - state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping - } else { - maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping) + if result != nil { + if state.SyncFlowOptions.RelationMessageMapping == nil { + state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping + } else { + maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping) + } } - state.SyncFlowStatuses = append(state.SyncFlowStatuses, result) - }) - - normErrorChan := model.NormalizeErrorSignal.GetSignalChannel(ctx) - normErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) { - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err) - }) - - normResultChan := model.NormalizeResultSignal.GetSignalChannel(ctx) - normResultChan.AddToSelector(mainLoopSelector, func(result model.NormalizeResponse, _ bool) { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, result) }) normChan := model.NormalizeSignal.GetSignalChannel(ctx) @@ -613,8 +554,6 @@ func CDCFlowWorkflow( return nil, err } - // important to control the size of inputs. - state.TruncateProgress(w.logger) return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 5c87b8d05b..adc7a7991a 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -98,19 +98,9 @@ func NormalizeFlowWorkflow( var normalizeResponse *model.NormalizeResponse if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - _ = model.NormalizeErrorSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - err.Error(), - ).Get(ctx, nil) + logger.Info("Normalize errored", slog.Any("error", err)) } else if normalizeResponse != nil { - _ = model.NormalizeResultSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - *normalizeResponse, - ).Get(ctx, nil) + logger.Info("Normalize finished", slog.Any("result", normalizeResponse)) } } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 2ce225f260..0e4c786e2c 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -13,6 +13,9 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) +// For now cdc restarts sync flow whenever it itself restarts, +// set this value high enough to never be met, relying on cdc restarts. +// In the future cdc flow restarts could be decoupled from sync flow restarts. const ( maxSyncsPerSyncFlow = 64 ) @@ -103,11 +106,11 @@ func SyncFlowWorkflow( var childSyncFlowRes *model.SyncResponse if err := f.Get(ctx, &childSyncFlowRes); err != nil { logger.Error("failed to execute sync flow", slog.Any("error", err)) - _ = model.SyncErrorSignal.SignalExternalWorkflow( + _ = model.SyncResultSignal.SignalExternalWorkflow( ctx, parent.ID, "", - err.Error(), + nil, ).Get(ctx, nil) syncErr = true } else if childSyncFlowRes != nil { @@ -115,7 +118,7 @@ func SyncFlowWorkflow( ctx, parent.ID, "", - *childSyncFlowRes, + childSyncFlowRes, ).Get(ctx, nil) options.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced @@ -145,12 +148,12 @@ func SyncFlowWorkflow( var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { - logger.Error("failed to execute schema update at source: ", err) - _ = model.SyncErrorSignal.SignalExternalWorkflow( + logger.Error("failed to execute schema update at source", slog.Any("error", err)) + _ = model.SyncResultSignal.SignalExternalWorkflow( ctx, parent.ID, "", - err.Error(), + nil, ).Get(ctx, nil) } else { for i, srcTable := range modifiedSrcTables {