Skip to content

Commit

Permalink
Clean up code, fetch syncbatchid/normalizebatchid together
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 25, 2023
1 parent ed67a63 commit 458abc1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 59 deletions.
7 changes: 3 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
TotalSyncFlows: 0,
ExitAfterRecords: -1,
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
Expand Down
31 changes: 14 additions & 17 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,29 +382,32 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return -1, err
return -1, -1, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found for job")
return 0, nil
return 0, 0, nil
}

if row[0] == nil {
c.logger.Info("no normalize_batch_id found returning 0")
return 0, nil
} else {
return row[0].(int64), nil
syncBatchID := int64(0)
normBatchID := int64(0)
if row[0] != nil {
syncBatchID = row[0].(int64)
}
if row[1] != nil {
normBatchID = row[1].(int64)
}
return syncBatchID, normBatchID, nil
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -746,13 +749,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

// get last batchid that has been normalize
normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
Expand All @@ -763,7 +760,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || normalizeBatchID == syncBatchID {
if !hasJob || normalizeBatchID >= syncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
return &model.NormalizeResponse{
Done: false,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, err
}
// normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded.
if syncBatchID == normalizeBatchID || !jobMetadataExists {
if normalizeBatchID >= syncBatchID || !jobMetadataExists {
c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d",
syncBatchID, normalizeBatchID))
return &model.NormalizeResponse{
Expand Down
40 changes: 18 additions & 22 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ const (

checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA=? and TABLE_NAME=?`
checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"
checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"
)

type tableNameComponents struct {
Expand Down Expand Up @@ -345,23 +345,23 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return result.Int64, nil
}

func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema,
func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncNormalizeBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err)
return 0, 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err)
}

var result pgtype.Int8
var syncResult, normResult pgtype.Int8
if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
return 0, 0, nil
}
err = rows.Scan(&result)
err = rows.Scan(&syncResult, &normResult)
if err != nil {
return 0, fmt.Errorf("error while reading result row: %w", err)
return 0, 0, fmt.Errorf("error while reading result row: %w", err)
}
return result.Int64, nil
return syncResult.Int64, normResult.Int64, nil
}

func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -590,16 +590,12 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table.
func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
// normalize has caught up with sync, chill until more records are loaded.
if syncBatchID == normalizeBatchID {
if normalizeBatchID >= syncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: normalizeBatchID,
Expand Down
22 changes: 7 additions & 15 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ type CDCFlowLimits struct {
// If 0, the number of sync flows will be continuously executed until the peer flow is cancelled.
// This is typically non-zero for testing purposes.
TotalSyncFlows int
// Number of normalize flows to execute in total.
// If 0, the number of sync flows will be continuously executed until the peer flow is cancelled.
// This is typically non-zero for testing purposes.
TotalNormalizeFlows int
// Maximum number of rows in a sync flow batch.
MaxBatchSize int
// Rows synced after which we can say a test is done.
Expand Down Expand Up @@ -443,17 +439,13 @@ func CDCFlowWorkflowWithConfig(
cfg,
)

selector := workflow.NewSelector(ctx)
selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) {
var childNormalizeFlowRes *model.NormalizeResponse
if err := f.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
})
selector.Select(ctx)
var childNormalizeFlowRes *model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
batchSizeSelector.Select(ctx)
}

Expand Down

0 comments on commit 458abc1

Please sign in to comment.