diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 00cc8da6da..76fae03b44 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -335,6 +335,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, func (a *FlowableActivity) StartNormalize( ctx context.Context, input *protos.StartNormalizeInput, + syncBatchID int64, ) (*model.NormalizeResponse, error) { conn := input.FlowConnectionConfigs @@ -346,13 +347,8 @@ func (a *FlowableActivity) StartNormalize( } defer connectors.CloseConnector(dstConn) - lastSyncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to get last sync batch ID: %v", err) - } - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, - lastSyncBatchID) + syncBatchID) return nil, err } else if err != nil { return nil, err @@ -377,6 +373,7 @@ func (a *FlowableActivity) StartNormalize( SoftDelete: input.FlowConnectionConfigs.SoftDelete, SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName, SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName, + SyncBatchID: syncBatchID, }) if err != nil { return nil, fmt.Errorf("failed to normalized records: %w", err) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2a6f762e3f..ac98965509 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -709,13 +709,9 @@ func (c *BigQueryConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { + syncBatchID := req.SyncBatchID rawTableName := c.getRawTableName(req.FlowJobName) - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) - } - // get last batchid that has been normalize normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) if err != nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 469775e49e..480b9d8d12 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -405,11 +405,9 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { + syncBatchID := req.SyncBatchID rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, err - } + normalizeBatchID, err := c.getLastNormalizeBatchID(req.FlowJobName) if err != nil { return nil, err diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 8fe26ce50f..4940703546 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -576,10 +576,8 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, err - } + syncBatchID := req.SyncBatchID + normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) if err != nil { return nil, err diff --git a/flow/model/model.go b/flow/model/model.go index e60db328fd..360470200c 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -450,6 +450,7 @@ type NormalizeRecordsRequest struct { SoftDelete bool SoftDeleteColName string SyncedAtColName string + SyncBatchID int64 } type SyncResponse struct { @@ -469,6 +470,11 @@ type SyncResponse struct { RelationMessageMapping *RelationMessageMapping } +type NormalizeSyncSignal struct { + CurrentSyncBatchID int64 + TableSchemaDeltas []*protos.TableSchemaDelta +} + type NormalizeResponse struct { // Flag to depict if normalization is done Done bool diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index c827dd4d1d..633c50a4b2 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -56,15 +56,22 @@ func NormalizeFlowWorkflow( var stopLoop bool for { // Sequence channel checks carefully to avoid race condition; - // must check & process all schema deltas before breaking loop + // must check & process all schema deltas before breaking loop. if !stopLoop { var stopLoopVal bool stopLoop = stopLoopChan.ReceiveAsync(&stopLoopVal) && stopLoopVal } - var tableSchemaDeltas []*protos.TableSchemaDelta - received, _ := schemaDeltas.ReceiveWithTimeout(ctx, 5*time.Second, &tableSchemaDeltas) + var syncSignal model.NormalizeSyncSignal + var received bool + if stopLoop { + received = schemaDeltas.ReceiveAsync(&syncSignal) + } else { + received, _ = schemaDeltas.ReceiveWithTimeout(ctx, 5*time.Second, &syncSignal) + } + if received { + tableSchemaDeltas := syncSignal.TableSchemaDeltas if len(tableSchemaDeltas) != 0 { // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas)) @@ -95,32 +102,32 @@ func NormalizeFlowWorkflow( } } } - } else if stopLoop { - break - } else { - continue - } - s := NewNormalizeFlowExecution(ctx) + s := NewNormalizeFlowExecution(ctx) - s.logger.Info("executing normalize flow - ", cfg.FlowJobName) + s.logger.Info("executing normalize flow - ", cfg.FlowJobName) - normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - HeartbeatTimeout: 5 * time.Minute, - }) + normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 7 * 24 * time.Hour, + HeartbeatTimeout: 5 * time.Minute, + }) - // execute StartFlow on the peers to start the flow - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: cfg, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + // execute StartFlow on the peers to start the flow + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: cfg, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput, syncSignal.CurrentSyncBatchID) - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err) + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err) + } else { + res.NormalizeFlowStatuses = append(res.NormalizeFlowStatuses, normalizeResponse) + } + } else if stopLoop { + break } else { - res.NormalizeFlowStatuses = append(res.NormalizeFlowStatuses, normalizeResponse) + continue } } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index a969cd0e3a..6003e46ece 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -97,7 +97,10 @@ func (s *SyncFlowExecution) executeSyncFlow( return nil, fmt.Errorf("failed to replay schema delta: %w", err) } - workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", syncRes.TableSchemaDeltas) + workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", &model.NormalizeSyncSignal{ + CurrentSyncBatchID: syncRes.CurrentSyncBatchID, + TableSchemaDeltas: syncRes.TableSchemaDeltas, + }) return syncRes, nil }