From c592ca91f7fc1050d0eba44922ad5c3f954435b8 Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Thu, 7 Dec 2023 15:08:33 +0000 Subject: [PATCH] Don't send 0 batch id, don't replay empty deltas --- flow/workflows/normalize_flow.go | 1 - flow/workflows/sync_flow.go | 38 ++++++++++++++++++-------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 633c50a4b2..b3fe40a25e 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -112,7 +112,6 @@ func NormalizeFlowWorkflow( HeartbeatTimeout: 5 * time.Minute, }) - // execute StartFlow on the peers to start the flow startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: cfg, } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 6003e46ece..eb8778211e 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -83,25 +83,29 @@ func (s *SyncFlowExecution) executeSyncFlow( return nil, fmt.Errorf("failed to flow: %w", err) } - replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 30 * time.Minute, - }) - replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ - FlowConnectionConfigs: config, - TableSchemaDeltas: syncRes.TableSchemaDeltas, - } - - fReplayTableSchemaDelta := workflow.ExecuteActivity(replayTableSchemaDeltaCtx, - flowable.ReplayTableSchemaDeltas, replayTableSchemaInput) - if err := fReplayTableSchemaDelta.Get(replayTableSchemaDeltaCtx, nil); err != nil { - return nil, fmt.Errorf("failed to replay schema delta: %w", err) + if syncRes.CurrentSyncBatchID != 0 { + if len(syncRes.TableSchemaDeltas) != 0 { + replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + }) + replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ + FlowConnectionConfigs: config, + TableSchemaDeltas: syncRes.TableSchemaDeltas, + } + + fReplayTableSchemaDelta := workflow.ExecuteActivity(replayTableSchemaDeltaCtx, + flowable.ReplayTableSchemaDeltas, replayTableSchemaInput) + if err := fReplayTableSchemaDelta.Get(replayTableSchemaDeltaCtx, nil); err != nil { + return nil, fmt.Errorf("failed to replay schema delta: %w", err) + } + } + + workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", &model.NormalizeSyncSignal{ + CurrentSyncBatchID: syncRes.CurrentSyncBatchID, + TableSchemaDeltas: syncRes.TableSchemaDeltas, + }) } - workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", &model.NormalizeSyncSignal{ - CurrentSyncBatchID: syncRes.CurrentSyncBatchID, - TableSchemaDeltas: syncRes.TableSchemaDeltas, - }) - return syncRes, nil }