Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup normalize #894

Merged
merged 6 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
TotalSyncFlows: 0,
ExitAfterRecords: -1,
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
Expand Down
60 changes: 34 additions & 26 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,29 +382,35 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) {
query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return -1, err
return model.SyncAndNormalizeBatchID{}, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found for job")
return 0, nil
return model.SyncAndNormalizeBatchID{}, nil
}

if row[0] == nil {
c.logger.Info("no normalize_batch_id found returning 0")
return 0, nil
} else {
return row[0].(int64), nil
syncBatchID := int64(0)
normBatchID := int64(0)
if row[0] != nil {
syncBatchID = row[0].(int64)
}
if row[1] != nil {
normBatchID = row[1].(int64)
}
return model.SyncAndNormalizeBatchID{
SyncBatchID: syncBatchID,
NormalizeBatchID: normBatchID,
}, nil
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -736,13 +742,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

// get last batchid that has been normalize
normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
Expand All @@ -753,20 +753,28 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || normalizeBatchID == syncBatchID {
if !hasJob || batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
return &model.NormalizeResponse{
Done: false,
StartBatchID: normalizeBatchID,
EndBatchID: syncBatchID,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}
distinctTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID)
distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID)
tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}
Expand All @@ -786,8 +794,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: c.tableNameSchemaMapping[tableName],
syncBatchID: syncBatchID,
normalizeBatchID: normalizeBatchID,
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
Expand All @@ -802,7 +810,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)

query := strings.Join(stmts, "\n")
Expand All @@ -813,8 +821,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)

return &model.NormalizeResponse{
Done: true,
StartBatchID: normalizeBatchID + 1,
EndBatchID: syncBatchID,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, err
}
// normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded.
if syncBatchID == normalizeBatchID || !jobMetadataExists {
if normalizeBatchID >= syncBatchID || !jobMetadataExists {
c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d",
syncBatchID, normalizeBatchID))
return &model.NormalizeResponse{
Expand Down
64 changes: 34 additions & 30 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ const (

checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA=? and TABLE_NAME=?`
checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"
checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"
)

type tableNameComponents struct {
Expand Down Expand Up @@ -345,23 +345,27 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return result.Int64, nil
}

func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema,
func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncNormalizeBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err)
return model.SyncAndNormalizeBatchID{},
fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err)
}

var result pgtype.Int8
var syncResult, normResult pgtype.Int8
if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
return model.SyncAndNormalizeBatchID{}, nil
}
err = rows.Scan(&result)
err = rows.Scan(&syncResult, &normResult)
if err != nil {
return 0, fmt.Errorf("error while reading result row: %w", err)
return model.SyncAndNormalizeBatchID{}, fmt.Errorf("error while reading result row: %w", err)
}
return result.Int64, nil
return model.SyncAndNormalizeBatchID{
SyncBatchID: syncResult.Int64,
NormalizeBatchID: normResult.Int64,
}, nil
}

func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -590,20 +594,16 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table.
func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
// normalize has caught up with sync, chill until more records are loaded.
if syncBatchID == normalizeBatchID {
if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: normalizeBatchID,
EndBatchID: syncBatchID,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}

Expand All @@ -617,12 +617,16 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
Done: false,
}, nil
}
destinationTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID)
destinationTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, err
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID)
tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
serprex marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err)
}
Expand All @@ -640,7 +644,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
tableName,
tableNametoUnchangedToastCols[tableName],
getRawTableIdentifier(req.FlowJobName),
syncBatchID, normalizeBatchID,
batchIDs.SyncBatchID, batchIDs.NormalizeBatchID,
req)
if err != nil {
c.logger.Error("[merge] error while normalizing records", slog.Any("error", err))
Expand All @@ -657,15 +661,15 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
}

// updating metadata with new normalizeBatchID
err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID)
err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID)
if err != nil {
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normalizeBatchID + 1,
EndBatchID: syncBatchID,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ func (r *CDCRecordStream) GetRecords() chan Record {
return r.records
}

type SyncAndNormalizeBatchID struct {
SyncBatchID int64
NormalizeBatchID int64
}

type SyncRecordsRequest struct {
Records *CDCRecordStream
// FlowJobName is the name of the flow job.
Expand Down
Loading
Loading