Skip to content

Commit

Permalink
Don't send 0 batch id, don't replay empty deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 7, 2023
1 parent 208f02a commit 54058c0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
1 change: 0 additions & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func NormalizeFlowWorkflow(
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: cfg,
}
Expand Down
38 changes: 21 additions & 17 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,29 @@ func (s *SyncFlowExecution) executeSyncFlow(
return nil, fmt.Errorf("failed to flow: %w", err)
}

replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
})
replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{
FlowConnectionConfigs: config,
TableSchemaDeltas: syncRes.TableSchemaDeltas,
}

fReplayTableSchemaDelta := workflow.ExecuteActivity(replayTableSchemaDeltaCtx,
flowable.ReplayTableSchemaDeltas, replayTableSchemaInput)
if err := fReplayTableSchemaDelta.Get(replayTableSchemaDeltaCtx, nil); err != nil {
return nil, fmt.Errorf("failed to replay schema delta: %w", err)
if syncRes.CurrentSyncBatchID != 0 {
if len(syncRes.TableSchemaDeltas) != 0 {
replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
})
replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{
FlowConnectionConfigs: config,
TableSchemaDeltas: syncRes.TableSchemaDeltas,
}

fReplayTableSchemaDelta := workflow.ExecuteActivity(replayTableSchemaDeltaCtx,
flowable.ReplayTableSchemaDeltas, replayTableSchemaInput)
if err := fReplayTableSchemaDelta.Get(replayTableSchemaDeltaCtx, nil); err != nil {
return nil, fmt.Errorf("failed to replay schema delta: %w", err)
}
}

workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", &model.NormalizeSyncSignal{
CurrentSyncBatchID: syncRes.CurrentSyncBatchID,
TableSchemaDeltas: syncRes.TableSchemaDeltas,
})
}

workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", &model.NormalizeSyncSignal{
CurrentSyncBatchID: syncRes.CurrentSyncBatchID,
TableSchemaDeltas: syncRes.TableSchemaDeltas,
})

return syncRes, nil
}

Expand Down

0 comments on commit 54058c0

Please sign in to comment.