Skip to content

Commit

Permalink
NormalizeRecords: take batch id rather than getting from connector
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 7, 2023
1 parent 9c24475 commit babf389
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 43 deletions.
9 changes: 3 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
func (a *FlowableActivity) StartNormalize(
ctx context.Context,
input *protos.StartNormalizeInput,
syncBatchID int64,
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs

Expand All @@ -389,13 +390,8 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

lastSyncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get last sync batch ID: %v", err)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
syncBatchID)
return nil, err
} else if err != nil {
return nil, err
Expand All @@ -420,6 +416,7 @@ func (a *FlowableActivity) StartNormalize(
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
SyncBatchID: syncBatchID,
})
if err != nil {
return nil, fmt.Errorf("failed to normalized records: %w", err)
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,9 @@ func (c *BigQueryConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table.
func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
syncBatchID := req.SyncBatchID
rawTableName := c.getRawTableName(req.FlowJobName)

syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

// get last batchid that has been normalize
normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,9 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
syncBatchID := req.SyncBatchID
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, err
}

normalizeBatchID, err := c.getLastNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, err
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,8 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table.
func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
syncBatchID := req.SyncBatchID

normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ type NormalizeRecordsRequest struct {
SoftDelete bool
SoftDeleteColName string
SyncedAtColName string
SyncBatchID int64
}

type SyncResponse struct {
Expand All @@ -471,6 +472,11 @@ type SyncResponse struct {
RelationMessageMapping *RelationMessageMapping
}

type NormalizeSyncSignal struct {
CurrentSyncBatchID int64
TableSchemaDeltas []*protos.TableSchemaDelta
}

type NormalizeResponse struct {
// Flag to depict if normalization is done
Done bool
Expand Down
53 changes: 30 additions & 23 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,22 @@ func NormalizeFlowWorkflow(
var stopLoop bool
for {
// Sequence channel checks carefully to avoid race condition;
// must check & process all schema deltas before breaking loop
// must check & process all schema deltas before breaking loop.
if !stopLoop {
var stopLoopVal bool
stopLoop = stopLoopChan.ReceiveAsync(&stopLoopVal) && stopLoopVal
}

var tableSchemaDeltas []*protos.TableSchemaDelta
received, _ := schemaDeltas.ReceiveWithTimeout(ctx, 5*time.Second, &tableSchemaDeltas)
var syncSignal model.NormalizeSyncSignal
var received bool
if stopLoop {
received = schemaDeltas.ReceiveAsync(&syncSignal)
} else {
received, _ = schemaDeltas.ReceiveWithTimeout(ctx, 5*time.Second, &syncSignal)
}

if received {
tableSchemaDeltas := syncSignal.TableSchemaDeltas
if len(tableSchemaDeltas) != 0 {
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas))
Expand Down Expand Up @@ -95,32 +102,32 @@ func NormalizeFlowWorkflow(
}
}
}
} else if stopLoop {
break
} else {
continue
}

s := NewNormalizeFlowExecution(ctx)
s := NewNormalizeFlowExecution(ctx)

s.logger.Info("executing normalize flow - ", cfg.FlowJobName)
s.logger.Info("executing normalize flow - ", cfg.FlowJobName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})
normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: cfg,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)
// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: cfg,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput, syncSignal.CurrentSyncBatchID)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err)
var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err)
} else {
res.NormalizeFlowStatuses = append(res.NormalizeFlowStatuses, normalizeResponse)
}
} else if stopLoop {
break
} else {
res.NormalizeFlowStatuses = append(res.NormalizeFlowStatuses, normalizeResponse)
continue
}
}

Expand Down
5 changes: 4 additions & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ func (s *SyncFlowExecution) executeSyncFlow(
return nil, fmt.Errorf("failed to replay schema delta: %w", err)
}

workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", syncRes.TableSchemaDeltas)
workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", &model.NormalizeSyncSignal{
CurrentSyncBatchID: syncRes.CurrentSyncBatchID,
TableSchemaDeltas: syncRes.TableSchemaDeltas,
})

return syncRes, nil
}
Expand Down

0 comments on commit babf389

Please sign in to comment.