Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize concurrently with sync flows #893

Merged
merged 9 commits into from
Jan 25, 2024
16 changes: 5 additions & 11 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

activity.RecordHeartbeat(ctx, "initialized table schema")
slog.InfoContext(ctx, "pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
Expand Down Expand Up @@ -268,6 +267,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
RelationMessageMapping: input.RelationMessageMapping,
}, nil
Expand Down Expand Up @@ -379,13 +379,8 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

lastSyncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get last sync batch ID: %w", err)
}

err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
input.SyncBatchID)
return nil, err
} else if err != nil {
return nil, err
Expand All @@ -399,6 +394,7 @@ func (a *FlowableActivity) StartNormalize(

res, err := dstConn.NormalizeRecords(&model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncBatchID: input.SyncBatchID,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
Expand All @@ -423,10 +419,8 @@ func (a *FlowableActivity) StartNormalize(
}

// log the number of batches normalized
if res != nil {
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))
}
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))

return res, nil
}
Expand Down
48 changes: 21 additions & 27 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
Expand All @@ -382,37 +383,30 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) {
query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s WHERE mirror_job_name = '%s'",
func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s WHERE mirror_job_name = '%s'",
MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = c.datasetID
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return model.SyncAndNormalizeBatchID{}, err
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found for job")
return model.SyncAndNormalizeBatchID{}, nil
return 0, nil
}

syncBatchID := int64(0)
normBatchID := int64(0)
if row[0] != nil {
syncBatchID = row[0].(int64)
}
if row[1] != nil {
normBatchID = row[1].(int64)
return row[0].(int64), nil
}
return model.SyncAndNormalizeBatchID{
SyncBatchID: syncBatchID,
NormalizeBatchID: normBatchID,
}, nil
return 0, nil
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -546,7 +540,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
Expand All @@ -557,27 +551,27 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
if !hasJob || normBatchID >= req.SyncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}
distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
Expand All @@ -599,8 +593,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
Expand All @@ -625,7 +619,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
MirrorJobsTable, req.SyncBatchID, req.FlowJobName)

query := c.client.Query(updateMetadataStmt)
query.DefaultProjectID = c.projectID
Expand All @@ -637,8 +631,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

Expand Down
27 changes: 12 additions & 15 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ const (
createRawTableBatchIDIndexSQL = "CREATE INDEX IF NOT EXISTS %s_batchid_idx ON %s.%s(_peerdb_batch_id)"
createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)"

getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastSyncAndNormalizeBatchID_SQL = "SELECT sync_batch_id,normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"

upsertJobMetadataForSyncSQL = `INSERT INTO %s.%s AS j VALUES ($1,$2,$3,$4)
ON CONFLICT(mirror_job_name) DO UPDATE SET lsn_offset=GREATEST(j.lsn_offset, EXCLUDED.lsn_offset), sync_batch_id=EXCLUDED.sync_batch_id`
Expand Down Expand Up @@ -471,24 +471,21 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return result.Int64, nil
}

func (c *PostgresConnector) GetLastSyncAndNormalizeBatchID(jobName string) (*model.SyncAndNormalizeBatchID, error) {
var syncResult, normalizeResult pgtype.Int8
func (c *PostgresConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
var result pgtype.Int8
err := c.pool.QueryRow(c.ctx, fmt.Sprintf(
getLastSyncAndNormalizeBatchID_SQL,
getLastNormalizeBatchID_SQL,
c.metadataSchema,
mirrorJobsTableIdentifier,
), jobName).Scan(&syncResult, &normalizeResult)
), jobName).Scan(&result)
if err != nil {
if err == pgx.ErrNoRows {
c.logger.Info("No row found, returning 0")
return &model.SyncAndNormalizeBatchID{}, nil
return 0, nil
}
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}
return &model.SyncAndNormalizeBatchID{
SyncBatchID: syncResult.Int64,
NormalizeBatchID: normalizeResult.Int64,
}, nil
return result.Int64, nil
}

func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
Expand Down
26 changes: 14 additions & 12 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,28 +431,29 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}, nil
}

batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

// normalize has caught up with sync, chill until more records are loaded.
if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
if normBatchID >= req.SyncBatchID {
c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d",
batchIDs.SyncBatchID, batchIDs.NormalizeBatchID))
req.SyncBatchID, normBatchID))
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
req.FlowJobName, req.SyncBatchID, normBatchID)
if err != nil {
return nil, err
}
unchangedToastColsMap, err := c.getTableNametoUnchangedCols(req.FlowJobName,
batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
req.SyncBatchID, normBatchID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -491,7 +492,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
normalizeStatements := normalizeStmtGen.generateNormalizeStatements()
for _, normalizeStatement := range normalizeStatements {
mergeStatementsBatch.Queue(normalizeStatement, batchIDs.NormalizeBatchID, batchIDs.SyncBatchID, destinationTableName).Exec(
mergeStatementsBatch.Queue(normalizeStatement, normBatchID, req.SyncBatchID, destinationTableName).Exec(
func(ct pgconn.CommandTag) error {
totalRowsAffected += int(ct.RowsAffected())
return nil
Expand All @@ -508,7 +509,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
c.logger.Info(fmt.Sprintf("normalized %d records", totalRowsAffected))

// updating metadata with new normalizeBatchID
err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID, normalizeRecordsTx)
err = c.updateNormalizeMetadata(req.FlowJobName, req.SyncBatchID, normalizeRecordsTx)
if err != nil {
return nil, err
}
Expand All @@ -520,8 +521,8 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID + 1,
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
EndBatchID: req.SyncBatchID,
}, nil
}

Expand Down Expand Up @@ -711,7 +712,8 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
func (c *PostgresConnector) ReplayTableSchemaDeltas(
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
Expand Down
Loading
Loading