Skip to content

Commit

Permalink
avoid nil dereference when sync flow canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 17, 2024
1 parent ac1c8e4 commit 6fc0f35
Showing 1 changed file with 34 additions and 35 deletions.
69 changes: 34 additions & 35 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,49 +473,48 @@ func CDCFlowWorkflowWithConfig(

w.logger.Info("Total records synced: ", totalRecordsSynced)

var tableSchemaDeltas []*protos.TableSchemaDelta
if childSyncFlowRes != nil {
tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas
}
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
var normalizeTableNameSchemaMapping map[string]*protos.TableSchema
if tableSchemaDeltas != nil {
modifiedSrcTables := make([]string, 0)
modifiedDstTables := make([]string, 0)
var normalizeTableNameSchemaMapping map[string]*protos.TableSchema
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
modifiedDstTables := make([]string, 0, tableSchemaDeltasCount)

for _, tableSchemaDelta := range tableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}
for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
FlowName: cfg.FlowJobName,
getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
for i := range modifiedSrcTables {
state.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
FlowName: cfg.FlowJobName,
})

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
for i := range modifiedSrcTables {
state.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
}
normalizeTableNameSchemaMapping = state.TableNameSchemaMapping
}
normalizeTableNameSchemaMapping = cfg.TableNameSchemaMapping
}
}

childNormalizeFlowFuture.SignalChildWorkflow(ctx, "Sync", model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
})
childNormalizeFlowFuture.SignalChildWorkflow(ctx, "Sync", model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
})
}

cdcPropertiesSelector.Select(ctx)
}
Expand Down

0 comments on commit 6fc0f35

Please sign in to comment.