From 03982d5657b8a79ce54208fd9d06c20ee1688eed Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 29 Feb 2024 19:23:01 +0530 Subject: [PATCH 1/3] Revert "making UX for edit mirror simpler" (#1403) Reverts PeerDB-io/peerdb#1338 --- flow/e2e/postgres/peer_flow_pg_test.go | 10 ++++++++ flow/workflows/cdc_flow.go | 33 +++++++++++-------------- ui/app/mirrors/[mirrorId]/edit/page.tsx | 16 +++++------- ui/app/mirrors/[mirrorId]/page.tsx | 10 ++------ ui/components/EditButton.tsx | 14 +++-------- 5 files changed, 36 insertions(+), 47 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 31db45788d..6d5d68f8fd 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1255,6 +1255,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { sentUpdate = true return true }) + }, 28*time.Second) + env.RegisterDelayedCallback(func() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { + if !sentUpdate { + return false + } + e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal) + s.t.Log("Sent resume signal") + return true + }) }, 56*time.Second) go func() { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 382efb3a26..0fa1110a85 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct { RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed - FlowConfigUpdate *protos.CDCFlowConfigUpdate + FlowConfigUpdates []*protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions } @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow SyncFlowErrors: nil, NormalizeFlowErrors: nil, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, - FlowConfigUpdate: nil, + FlowConfigUpdates: nil, SyncFlowOptions: &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, @@ -144,18 +144,17 @@ const ( maxSyncsPerCdcFlow = 60 ) -func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context, +func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, ) error { - flowConfigUpdate := state.FlowConfigUpdate - if flowConfigUpdate != nil { + for _, flowConfigUpdate := range state.FlowConfigUpdates { if len(flowConfigUpdate.AdditionalTables) == 0 { - return nil + continue } if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { w.logger.Warn("duplicate source/destination tables found in additionalTables") - return nil + continue } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -201,12 +200,12 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping) maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping) + maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping) state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) - - // finished processing, wipe it - state.FlowConfigUpdate = nil } + // finished processing, wipe it + state.FlowConfigUpdates = nil return nil } @@ -224,8 +223,9 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( if cdcConfigUpdate.IdleTimeout > 0 { state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } - // do this irrespective of additional tables being present, for auto unpausing - state.FlowConfigUpdate = cdcConfigUpdate + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } if w.syncFlowFuture != nil { _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) @@ -307,12 +307,9 @@ func CDCFlowWorkflow( return state, err } - if state.FlowConfigUpdate != nil { - err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch) - if err != nil { - return state, err - } - state.ActiveSignal = model.NoopSignal + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) + if err != nil { + return state, err } } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 4cff1b4538..1d9c6aa850 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -114,10 +114,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { } }; - const isNotPaused = - mirrorState.currentFlowState.toString() !== - FlowStatus[FlowStatus.STATUS_PAUSED]; - return (
@@ -182,11 +178,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { omitAdditionalTablesMapping={omitAdditionalTablesMapping} /> - {isNotPaused ? ( - - ) : ( - - )}
+ ); }; From 56b0dff9ccc8586feffe17c526e92ab68178b434 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 29 Feb 2024 20:50:15 +0530 Subject: [PATCH 2/3] fixed handling of selector for CDC dynamic properties (#1405) Reverts PeerDB-io/peerdb#1403 --- flow/e2e/postgres/peer_flow_pg_test.go | 10 ------- flow/workflows/cdc_flow.go | 39 +++++++++++++++---------- ui/app/mirrors/[mirrorId]/edit/page.tsx | 16 ++++++---- ui/app/mirrors/[mirrorId]/page.tsx | 10 +++++-- ui/components/EditButton.tsx | 14 +++++++-- 5 files changed, 52 insertions(+), 37 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 6d5d68f8fd..31db45788d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1255,16 +1255,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { sentUpdate = true return true }) - }, 28*time.Second) - env.RegisterDelayedCallback(func() { - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { - if !sentUpdate { - return false - } - e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal) - s.t.Log("Sent resume signal") - return true - }) }, 56*time.Second) go func() { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0fa1110a85..6c1c76cb28 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct { RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed - FlowConfigUpdates []*protos.CDCFlowConfigUpdate + FlowConfigUpdate *protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions } @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow SyncFlowErrors: nil, NormalizeFlowErrors: nil, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, - FlowConfigUpdates: nil, + FlowConfigUpdate: nil, SyncFlowOptions: &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, @@ -144,17 +144,18 @@ const ( maxSyncsPerCdcFlow = 60 ) -func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, +func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, ) error { - for _, flowConfigUpdate := range state.FlowConfigUpdates { + flowConfigUpdate := state.FlowConfigUpdate + if flowConfigUpdate != nil { if len(flowConfigUpdate.AdditionalTables) == 0 { - continue + return nil } if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { w.logger.Warn("duplicate source/destination tables found in additionalTables") - continue + return nil } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -200,12 +201,16 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping) maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping) - maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping) state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) + + if w.syncFlowFuture != nil { + _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) + } + + // finished processing, wipe it + state.FlowConfigUpdate = nil } - // finished processing, wipe it - state.FlowConfigUpdates = nil return nil } @@ -223,9 +228,8 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( if cdcConfigUpdate.IdleTimeout > 0 { state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } + // do this irrespective of additional tables being present, for auto unpausing + state.FlowConfigUpdate = cdcConfigUpdate if w.syncFlowFuture != nil { _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) @@ -299,7 +303,7 @@ func CDCFlowWorkflow( for state.ActiveSignal == model.PauseSignal { // only place we block on receive, so signal processing is immediate - for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil { + for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil { w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) selector.Select(ctx) } @@ -307,9 +311,12 @@ func CDCFlowWorkflow( return state, err } - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) - if err != nil { - return state, err + if state.FlowConfigUpdate != nil { + err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch) + if err != nil { + return state, err + } + state.ActiveSignal = model.NoopSignal } } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 1d9c6aa850..4cff1b4538 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { } }; + const isNotPaused = + mirrorState.currentFlowState.toString() !== + FlowStatus[FlowStatus.STATUS_PAUSED]; + return (
@@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { omitAdditionalTablesMapping={omitAdditionalTablesMapping} /> + {isNotPaused ? ( + + ) : ( + + )}
+ ); }; From f9c883e607de5d2f7966861e6df7bbbd27356a09 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 29 Feb 2024 10:21:26 -0500 Subject: [PATCH 3/3] Pause will drop the current Sync and Normalize and hard-pause (#1406) This is a more desired behavior when a user clicks pause --- flow/workflows/cdc_flow.go | 12 ++++++++++-- flow/workflows/sync_flow.go | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 6c1c76cb28..1d4f2bea30 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -141,7 +141,7 @@ func GetChildWorkflowID( type CDCFlowWorkflowResult = CDCFlowWorkflowState const ( - maxSyncsPerCdcFlow = 60 + maxSyncsPerCdcFlow = 32 ) func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context, @@ -598,13 +598,21 @@ func CDCFlowWorkflow( } if restart { + if state.ActiveSignal == model.PauseSignal { + finished = true + } + for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { mainLoopSelector.Select(ctx) } + if err := ctx.Err(); err != nil { w.logger.Info("mirror canceled", slog.Any("error", err)) - return state, err + return nil, err } + + // important to control the size of inputs. + state.TruncateProgress(w.logger) return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 6d2c9def50..2ce225f260 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -14,7 +14,7 @@ import ( ) const ( - maxSyncsPerSyncFlow = 72 + maxSyncsPerSyncFlow = 64 ) func SyncFlowWorkflow(