Skip to content

Commit

Permalink
explicitly call EnsurePullability while adding tables
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 15, 2024
1 parent 59d7ee2 commit 3e64300
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []

func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error {
if c.conn == nil {
return fmt.Errorf("check replication permissions: conn is nil")
return errors.New("check replication permissions: conn is nil")
}

var replicationRes bool
Expand All @@ -695,7 +695,7 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use
var setting string
err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting)
if err != nil || setting != "on" {
return fmt.Errorf("postgres user does not have replication role")
return errors.New("postgres user does not have replication role")
}
}

Expand Down
26 changes: 25 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,29 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
return err
}

ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
ensurePullabilityFuture := workflow.ExecuteActivity(
ensurePullabilityCtx,
flowable.EnsurePullability,
&protos.EnsurePullabilityBatchInput{
PeerConnectionConfig: cfg.Source,
FlowJobName: cfg.FlowJobName,
SourceTableIdentifiers: func() []string {
additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables))
for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables {
additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier)
}
return additionalSourceTables
}(),
CheckConstraints: true,
})
if err := ensurePullabilityFuture.Get(ctx, nil); err != nil {
w.logger.Error("failed to ensure pullability for additional tables: ", err)
return err
}

additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialSnapshot = true
additionalTablesWorkflowCfg.InitialSnapshotOnly = true
Expand Down Expand Up @@ -217,7 +240,7 @@ func CDCFlowWorkflowWithConfig(
state *CDCFlowWorkflowState,
) (*CDCFlowWorkflowResult, error) {
if cfg == nil {
return nil, fmt.Errorf("invalid connection configs")
return nil, errors.New("invalid connection configs")
}
if state == nil {
state = NewCDCFlowWorkflowState(cfg.TableMappings)
Expand Down Expand Up @@ -471,6 +494,7 @@ func CDCFlowWorkflowWithConfig(
// only place we block on receive, so signal processing is immediate
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
Expand Down

0 comments on commit 3e64300

Please sign in to comment.