Skip to content

Commit

Permalink
reverted to single CDCFlow, no interface mechanism yet
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 17, 2024
1 parent 2a8554b commit 3080cae
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 145 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
p.logger.Info(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
Expand Down
54 changes: 31 additions & 23 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,38 @@ func EnvWaitForEqualTables(
table string,
cols string,
) {
// wait for PeerFlowStatusQuery to finish setup
// sleep for 5 second to allow the workflow to start
time.Sleep(5 * time.Second)
for {
response, err := env.QueryWorkflow(
shared.CDCFlowStateQuery,
connectionGen.FlowJobName,
)
if err == nil {
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
slog.Error(err.Error())
}
suite.T().Helper()
EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols)
}

if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
break
}
} else {
// log the error for informational purposes
slog.Error(err.Error())
func EnvWaitForEqualTablesWithNames(
env *testsuite.TestWorkflowEnvironment,
suite e2eshared.RowSource,
reason string,
srcTable string,
dstTable string,
cols string,
) {
t := suite.T()
t.Helper()

EnvWaitFor(t, env, 3*time.Minute, reason, func() bool {
t.Helper()

suffix := suite.Suffix()
pool := suite.Pool()
pgRows, err := GetPgRows(pool, suffix, srcTable, cols)
if err != nil {
return false
}
time.Sleep(1 * time.Second)
}

rows, err := suite.GetRows(dstTable, cols)
if err != nil {
return false
}

return e2eshared.CheckEqualRecordBatches(t, pgRows, rows)
})
}

func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironment,
Expand All @@ -179,7 +187,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen
err = response.Get(&state)
if err != nil {
slog.Error(err.Error())
} else if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
} else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING {
return
}
} else if counter > 15 {
Expand Down
3 changes: 1 addition & 2 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ const (
FlowStatusQuery = "q-flow-status"

// Updates
FlowStatusUpdate = "u-flow-status"
CDCFlowConfigUpdate = "u-cdc-flow-config-update"
FlowStatusUpdate = "u-flow-status"
)

const MirrorNameSearchAttribute = "MirrorName"
Expand Down
119 changes: 28 additions & 91 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type CDCFlowWorkflowState struct {
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// maintaining a copy of SrcTableIdNameMapping and TableNameSchemaMapping from protos.FlowConnectionConfigs
// ideally it shouldn't even be in the config, since it is set dynamically in SetupFlow and should be only in state
SrcTableIdNameMapping map[uint32]string
TableNameSchemaMapping map[string]*protos.TableSchema
}

type SignalProps struct {
Expand Down Expand Up @@ -177,7 +181,7 @@ func additionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch *map[string]interface{},
limits *CDCFlowLimits, mirrorNameSearch *map[string]interface{},
) error {
for _, flowConfigUpdate := range state.FlowConfigUpdates {
if len(flowConfigUpdate.AdditionalTables) == 0 {
Expand All @@ -199,102 +203,47 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
return err
}

additionalTablesSetupFlowID, err := GetChildWorkflowID(ctx,
"additional-tables-setup-flow", cfg.FlowJobName)
if err != nil {
return err
}
additionalTablesSetupFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: additionalTablesSetupFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 2,
},
SearchAttributes: *mirrorNameSearch,
}

additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialCopy = true
additionalTablesWorkflowCfg.InitialCopyOnly = true
additionalTablesWorkflowCfg.ForcePkeyChecks = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables
additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName,
strings.ToLower(shared.RandomString(8)))

additionalTablesSetupCtx := workflow.WithChildOptions(ctx,
additionalTablesSetupFlowOpts)
additionalTablesSetupFlowFuture := workflow.ExecuteChildWorkflow(
additionalTablesSetupCtx,
SetupFlowWorkflow,
additionalTablesWorkflowCfg,
)
if err := additionalTablesSetupFlowFuture.Get(additionalTablesSetupCtx,
&additionalTablesWorkflowCfg); err != nil {
w.logger.Error("failed to execute SetupFlow for additional tables: ", err)
return fmt.Errorf("failed to execute SetupFlow for additional tables: %w", err)
}

// next part of the setup is to snapshot-initial-copy and setup replication slots.
additionalTablesSnapshotFlowID, err := GetChildWorkflowID(ctx,
"additional-tables-snapshot-flow", cfg.FlowJobName)
childAdditionalTablesCDCFlowID,
err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName)
if err != nil {
return err
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if err != nil {
return err
}

additionalTablesSnapshotFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: additionalTablesSnapshotFlowID,
// execute the sync flow as a child workflow
childAdditionalTablesCDCFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: childAdditionalTablesCDCFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
TaskQueue: taskQueue,
SearchAttributes: *mirrorNameSearch,
}
additionalTablesSnapshotFlowCtx := workflow.WithChildOptions(ctx, additionalTablesSnapshotFlowOpts)
additionalTablesSnapshotFlowFuture := workflow.ExecuteChildWorkflow(additionalTablesSnapshotFlowCtx,
SnapshotFlowWorkflow, additionalTablesWorkflowCfg)
if err := additionalTablesSnapshotFlowFuture.Get(additionalTablesSnapshotFlowCtx, nil); err != nil {
return fmt.Errorf("failed to execute child workflow: %w", err)
}

additionalTablesDropFlowID, err := GetChildWorkflowID(ctx,
"additional-tables-drop-flow", cfg.FlowJobName)
if err != nil {
return err
SearchAttributes: *mirrorNameSearch,
WaitForCancellation: true,
}
additionalTablesDropFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: additionalTablesDropFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
},
SearchAttributes: *mirrorNameSearch,
}
additionalTablesDropCtx := workflow.WithChildOptions(ctx,
additionalTablesDropFlowOpts)
additionalTablesDropFlowFuture := workflow.ExecuteChildWorkflow(
additionalTablesDropCtx,
DropFlowWorkflow,
&protos.ShutdownRequest{
WorkflowId: additionalTablesSetupFlowID,
FlowJobName: additionalTablesWorkflowCfg.FlowJobName,
SourcePeer: cfg.Source,
DestinationPeer: cfg.Destination,
RemoveFlowEntry: false,
},
childAdditionalTablesCDCFlowCtx := workflow.WithChildOptions(ctx, childAdditionalTablesCDCFlowOpts)
childAdditionalTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
childAdditionalTablesCDCFlowCtx,
CDCFlowWorkflowWithConfig,
additionalTablesWorkflowCfg,
nil,
limits,
)
if err := additionalTablesDropFlowFuture.Get(additionalTablesDropCtx, nil); err != nil {
w.logger.Error("failed to execute DropFlow for additional tables: ", err)
return fmt.Errorf("failed to execute DropFlow for additional tables: %w", err)
var res *CDCFlowWorkflowResult
if err := childAdditionalTablesCDCFlowFuture.Get(childAdditionalTablesCDCFlowCtx, &res); err != nil {
return err
}

for tableID, tableName := range additionalTablesWorkflowCfg.SrcTableIdNameMapping {
for tableID, tableName := range res.SrcTableIdNameMapping {
cfg.SrcTableIdNameMapping[tableID] = tableName
}
for tableName, tableSchema := range additionalTablesWorkflowCfg.TableNameSchemaMapping {
for tableName, tableSchema := range res.TableNameSchemaMapping {
cfg.TableNameSchemaMapping[tableName] = tableSchema
}
cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...)
Expand Down Expand Up @@ -338,18 +287,6 @@ func CDCFlowWorkflowWithConfig(
if err != nil {
return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err)
}
err = workflow.SetUpdateHandler(ctx, shared.CDCFlowConfigUpdate,
func(cdcFlowConfigUpdate *protos.CDCFlowConfigUpdate) error {
if state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcFlowConfigUpdate)
return nil
}
return fmt.Errorf(`flow config updates can only be sent when workflow is paused,
current status: %v`, state.CurrentFlowStatus)
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.CDCFlowConfigUpdate, err)
}
mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}
Expand Down Expand Up @@ -526,7 +463,7 @@ func CDCFlowWorkflowWithConfig(
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
// only process config updates when going from STATUS_PAUSED to STATUS_RUNNING
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, &mirrorNameSearch)
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, &mirrorNameSearch)
if err != nil {
return state, err
}
Expand Down Expand Up @@ -671,5 +608,5 @@ func CDCFlowWorkflowWithConfig(
}

state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}
16 changes: 4 additions & 12 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,16 @@ func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error
}

// Support a Query for the current status of the qrep flow.
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
return &state.CurrentFlowState, nil
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) {
return state.CurrentFlowStatus, nil
})
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}

// Support an Update for the current status of the qrep flow.
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
state.CurrentFlowState = *status
state.CurrentFlowStatus = *status
return nil
})
if err != nil {
Expand Down Expand Up @@ -427,15 +427,7 @@ func QRepFlowWorkflow(

err := setWorkflowQueries(ctx, state)
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err)
}

// Support a Query for the current status of the arep flow.
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
return &state.CurrentFlowStatus, nil
})
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
return err
}

// Support an Update for the current status of the qrep flow.
Expand Down
9 changes: 5 additions & 4 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sourceTables)

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly,
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly &&
!flowConnectionConfigs.ForcePkeyChecks,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
13 changes: 1 addition & 12 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,18 +750,7 @@ impl NexusBackend {
flow_job_name,
workflow_details,
pt::peerdb_flow::FlowStatus::StatusRunning,
Some(pt::peerdb_flow::FlowConfigUpdate {
update: Some(pt::peerdb_flow::flow_config_update::Update::CdcFlowConfigUpdate(
pt::peerdb_flow::CdcFlowConfigUpdate {
additional_tables: vec![pt::peerdb_flow::TableMapping {
source_table_identifier: "public.oss2".to_string(),
destination_table_identifier: "public.oss2dst"
.to_string(),
partition_key: "".to_string(),
exclude: vec![],
}],
})),
}),
None,
)
.await
.map_err(|err| {
Expand Down
4 changes: 4 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ message FlowConnectionConfigs {
bool soft_delete = 17;
string soft_delete_col_name = 18;
string synced_at_col_name = 19;

// make InitialCopyOnly enable EnsurePullability for the use case of dynamic table addition
// TODO: find a better way to do this.
bool force_pkey_checks = 20;
}

message RenameTableOption {
Expand Down
1 change: 1 addition & 0 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
syncedAtColName: '',
initialSnapshotOnly: false,
idleTimeoutSeconds: 60,
forcePkeyChecks: false,
};

export const blankQRepSetting = {
Expand Down

0 comments on commit 3080cae

Please sign in to comment.