Skip to content

Commit

Permalink
format & cleanup channel reading
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 5, 2023
1 parent 61e0824 commit ed6e356
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (a *FlowableActivity) CreateNormalizedTable(
return conn.SetupNormalizedTables(config)
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
Expand Down
23 changes: 11 additions & 12 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ type NormalizeFlowExecution struct {

type NormalizeFlowResult struct {
NormalizeFlowStatuses []*model.NormalizeResponse
NormalizeFlowErrors error
NormalizeFlowErrors error
}

func NewNormalizeFlowExecution(ctx workflow.Context) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

Expand All @@ -36,7 +36,7 @@ func NormalizeFlowWorkflow(
) (*NormalizeFlowResult, error) {
w := NewCDCFlowWorkflowExecution(ctx)

res := NormalizeFlowResult {}
res := NormalizeFlowResult{}

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
Expand All @@ -53,11 +53,10 @@ func NormalizeFlowWorkflow(
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)

for {
var tableSchemaDeltasRecv interface{}
if !syncNormChan.Receive(ctx, &tableSchemaDeltasRecv) {
var tableSchemaDeltas []*protos.TableSchemaDelta
if !syncNormChan.Receive(ctx, &tableSchemaDeltas) {
break
}
var tableSchemaDeltas []*protos.TableSchemaDelta = tableSchemaDeltasRecv.([]*protos.TableSchemaDelta)

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltas != nil {
Expand All @@ -73,10 +72,10 @@ func NormalizeFlowWorkflow(
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
})
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
})

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
Expand All @@ -85,7 +84,7 @@ func NormalizeFlowWorkflow(
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
}
}
}
Expand Down

0 comments on commit ed6e356

Please sign in to comment.