From 3e64300b77c18bda9d3a76e241f1951b1b473f40 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 15 Feb 2024 17:48:02 +0530 Subject: [PATCH] explicitly call EnsurePullability while adding tables --- flow/connectors/postgres/client.go | 4 ++-- flow/workflows/cdc_flow.go | 26 +++++++++++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 563e0c6824..2f77879c68 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -681,7 +681,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames [] func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error { if c.conn == nil { - return fmt.Errorf("check replication permissions: conn is nil") + return errors.New("check replication permissions: conn is nil") } var replicationRes bool @@ -695,7 +695,7 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use var setting string err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting) if err != nil || setting != "on" { - return fmt.Errorf("postgres user does not have replication role") + return errors.New("postgres user does not have replication role") } } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index afc17927ce..cb35b335c1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -164,6 +164,29 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } + ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + ensurePullabilityFuture := workflow.ExecuteActivity( + ensurePullabilityCtx, + flowable.EnsurePullability, + &protos.EnsurePullabilityBatchInput{ + PeerConnectionConfig: cfg.Source, + FlowJobName: cfg.FlowJobName, + SourceTableIdentifiers: func() []string { + additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables)) + for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables { + additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier) + } + return additionalSourceTables + }(), + CheckConstraints: true, + }) + if err := ensurePullabilityFuture.Get(ctx, nil); err != nil { + w.logger.Error("failed to ensure pullability for additional tables: ", err) + return err + } + additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialSnapshot = true additionalTablesWorkflowCfg.InitialSnapshotOnly = true @@ -217,7 +240,7 @@ func CDCFlowWorkflowWithConfig( state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { if cfg == nil { - return nil, fmt.Errorf("invalid connection configs") + return nil, errors.New("invalid connection configs") } if state == nil { state = NewCDCFlowWorkflowState(cfg.TableMappings) @@ -471,6 +494,7 @@ func CDCFlowWorkflowWithConfig( // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { + state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) if err != nil { return state, err