diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 33c944f993..04a264569f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -160,7 +160,6 @@ func (a *FlowableActivity) CreateNormalizedTable( return conn.SetupNormalizedTables(config) } -// StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 4278dd2989..3d7d2d8b5b 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -19,13 +19,13 @@ type NormalizeFlowExecution struct { type NormalizeFlowResult struct { NormalizeFlowStatuses []*model.NormalizeResponse - NormalizeFlowErrors error + NormalizeFlowErrors error } func NewNormalizeFlowExecution(ctx workflow.Context) *NormalizeFlowExecution { return &NormalizeFlowExecution{ - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), + executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + logger: workflow.GetLogger(ctx), } } @@ -36,7 +36,7 @@ func NormalizeFlowWorkflow( ) (*NormalizeFlowResult, error) { w := NewCDCFlowWorkflowExecution(ctx) - res := NormalizeFlowResult {} + res := NormalizeFlowResult{} normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) if err != nil { @@ -53,11 +53,10 @@ func NormalizeFlowWorkflow( ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) for { - var tableSchemaDeltasRecv interface{} - if !syncNormChan.Receive(ctx, &tableSchemaDeltasRecv) { + var tableSchemaDeltas []*protos.TableSchemaDelta + if !syncNormChan.Receive(ctx, &tableSchemaDeltas) { break } - var tableSchemaDeltas []*protos.TableSchemaDelta = tableSchemaDeltasRecv.([]*protos.TableSchemaDelta) // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. if tableSchemaDeltas != nil { @@ -73,10 +72,10 @@ func NormalizeFlowWorkflow( StartToCloseTimeout: 5 * time.Minute, }) getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, - &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: cfg.Source, - TableIdentifiers: modifiedSrcTables, - }) + &protos.GetTableSchemaBatchInput{ + PeerConnectionConfig: cfg.Source, + TableIdentifiers: modifiedSrcTables, + }) var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { @@ -85,7 +84,7 @@ func NormalizeFlowWorkflow( } else { for i := range modifiedSrcTables { cfg.TableNameSchemaMapping[modifiedDstTables[i]] = - getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] } } }