From 0efc334c48398273a2318b932be563fdba85b2b2 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 29 Feb 2024 19:11:44 +0530 Subject: [PATCH] Revert "making UX for edit mirror simpler (#1338)" This reverts commit b6eb5d1e138ec834c65c0f4e5240e44959a2ce32. --- flow/e2e/postgres/peer_flow_pg_test.go | 10 ++++++++ flow/workflows/cdc_flow.go | 33 +++++++++++-------------- ui/app/mirrors/[mirrorId]/edit/page.tsx | 16 +++++------- ui/app/mirrors/[mirrorId]/page.tsx | 10 ++------ ui/components/EditButton.tsx | 14 +++-------- 5 files changed, 36 insertions(+), 47 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 31db45788d..6d5d68f8fd 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1255,6 +1255,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { sentUpdate = true return true }) + }, 28*time.Second) + env.RegisterDelayedCallback(func() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { + if !sentUpdate { + return false + } + e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal) + s.t.Log("Sent resume signal") + return true + }) }, 56*time.Second) go func() { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 382efb3a26..0fa1110a85 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct { RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed - FlowConfigUpdate *protos.CDCFlowConfigUpdate + FlowConfigUpdates []*protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions } @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow SyncFlowErrors: nil, NormalizeFlowErrors: nil, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, - FlowConfigUpdate: nil, + FlowConfigUpdates: nil, SyncFlowOptions: &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, @@ -144,18 +144,17 @@ const ( maxSyncsPerCdcFlow = 60 ) -func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context, +func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, ) error { - flowConfigUpdate := state.FlowConfigUpdate - if flowConfigUpdate != nil { + for _, flowConfigUpdate := range state.FlowConfigUpdates { if len(flowConfigUpdate.AdditionalTables) == 0 { - return nil + continue } if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { w.logger.Warn("duplicate source/destination tables found in additionalTables") - return nil + continue } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -201,12 +200,12 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping) maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping) + maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping) state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) - - // finished processing, wipe it - state.FlowConfigUpdate = nil } + // finished processing, wipe it + state.FlowConfigUpdates = nil return nil } @@ -224,8 +223,9 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( if cdcConfigUpdate.IdleTimeout > 0 { state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } - // do this irrespective of additional tables being present, for auto unpausing - state.FlowConfigUpdate = cdcConfigUpdate + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } if w.syncFlowFuture != nil { _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) @@ -307,12 +307,9 @@ func CDCFlowWorkflow( return state, err } - if state.FlowConfigUpdate != nil { - err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch) - if err != nil { - return state, err - } - state.ActiveSignal = model.NoopSignal + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) + if err != nil { + return state, err } } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 4cff1b4538..1d9c6aa850 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -114,10 +114,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { } }; - const isNotPaused = - mirrorState.currentFlowState.toString() !== - FlowStatus[FlowStatus.STATUS_PAUSED]; - return (