Skip to content

Commit

Permalink
Normalize concurrently with sync flows
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 18, 2024
1 parent 304841a commit af784fe
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 208 deletions.
17 changes: 5 additions & 12 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

activity.RecordHeartbeat(ctx, "initialized table schema")
slog.InfoContext(ctx, "pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
Expand Down Expand Up @@ -379,26 +378,22 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

lastSyncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get last sync batch ID: %v", err)
}

err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
input.SyncBatchID)
return nil, err
} else if err != nil {
return nil, err
}
defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, 15*time.Second, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()

res, err := dstConn.NormalizeRecords(&model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncBatchID: input.SyncBatchID,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
Expand All @@ -423,10 +418,8 @@ func (a *FlowableActivity) StartNormalize(
}

// log the number of batches normalized
if res != nil {
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
res.StartBatchID, res.EndBatchID))
}
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
res.StartBatchID, res.EndBatchID))

return res, nil
}
Expand Down
48 changes: 21 additions & 27 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
Expand All @@ -361,35 +362,28 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) {
query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
q.DisableQueryCache = true
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return model.SyncAndNormalizeBatchID{}, err
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found for job")
return model.SyncAndNormalizeBatchID{}, nil
return 0, nil
}

syncBatchID := int64(0)
normBatchID := int64(0)
if row[0] != nil {
syncBatchID = row[0].(int64)
}
if row[1] != nil {
normBatchID = row[1].(int64)
return row[0].(int64), nil
}
return model.SyncAndNormalizeBatchID{
SyncBatchID: syncBatchID,
NormalizeBatchID: normBatchID,
}, nil
return 0, nil
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
Expand Down Expand Up @@ -527,7 +521,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
Expand All @@ -538,27 +532,27 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
if !hasJob || normBatchID >= req.SyncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}
distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
Expand All @@ -579,8 +573,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
Expand All @@ -603,7 +597,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
c.datasetID, MirrorJobsTable, req.SyncBatchID, req.FlowJobName)

_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
if err != nil {
Expand All @@ -612,8 +606,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

Expand Down
27 changes: 12 additions & 15 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ const (
createRawTableBatchIDIndexSQL = "CREATE INDEX IF NOT EXISTS %s_batchid_idx ON %s.%s(_peerdb_batch_id)"
createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)"

getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastSyncAndNormalizeBatchID_SQL = "SELECT sync_batch_id,normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)"
checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1"
Expand Down Expand Up @@ -441,24 +441,21 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return result.Int64, nil
}

func (c *PostgresConnector) GetLastSyncAndNormalizeBatchID(jobName string) (*model.SyncAndNormalizeBatchID, error) {
var syncResult, normalizeResult pgtype.Int8
func (c *PostgresConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
var result pgtype.Int8
err := c.pool.QueryRow(c.ctx, fmt.Sprintf(
getLastSyncAndNormalizeBatchID_SQL,
getLastNormalizeBatchID_SQL,
c.metadataSchema,
mirrorJobsTableIdentifier,
), jobName).Scan(&syncResult, &normalizeResult)
), jobName).Scan(&result)
if err != nil {
if err == pgx.ErrNoRows {
c.logger.Info("No row found, returning 0")
return &model.SyncAndNormalizeBatchID{}, nil
return 0, nil
}
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}
return &model.SyncAndNormalizeBatchID{
SyncBatchID: syncResult.Int64,
NormalizeBatchID: normalizeResult.Int64,
}, nil
return result.Int64, nil
}

func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
Expand Down
28 changes: 16 additions & 12 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return &model.SyncResponse{
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down Expand Up @@ -436,28 +438,29 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}, nil
}

batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}

// normalize has caught up with sync, chill until more records are loaded.
if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
if normBatchID >= req.SyncBatchID {
c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d",
batchIDs.SyncBatchID, batchIDs.NormalizeBatchID))
req.SyncBatchID, normBatchID))
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID,
EndBatchID: req.SyncBatchID,
}, nil
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
req.FlowJobName, req.SyncBatchID, normBatchID)
if err != nil {
return nil, err
}
unchangedToastColsMap, err := c.getTableNametoUnchangedCols(req.FlowJobName,
batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
req.SyncBatchID, normBatchID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -496,7 +499,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
normalizeStatements := normalizeStmtGen.generateNormalizeStatements()
for _, normalizeStatement := range normalizeStatements {
mergeStatementsBatch.Queue(normalizeStatement, batchIDs.NormalizeBatchID, batchIDs.SyncBatchID, destinationTableName).Exec(
mergeStatementsBatch.Queue(normalizeStatement, normBatchID, req.SyncBatchID, destinationTableName).Exec(
func(ct pgconn.CommandTag) error {
totalRowsAffected += int(ct.RowsAffected())
return nil
Expand All @@ -513,7 +516,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
c.logger.Info(fmt.Sprintf("normalized %d records", totalRowsAffected))

// updating metadata with new normalizeBatchID
err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID, normalizeRecordsTx)
err = c.updateNormalizeMetadata(req.FlowJobName, req.SyncBatchID, normalizeRecordsTx)
if err != nil {
return nil, err
}
Expand All @@ -525,8 +528,8 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

Expand Down Expand Up @@ -720,7 +723,8 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
func (c *PostgresConnector) ReplayTableSchemaDeltas(
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
Expand Down
Loading

0 comments on commit af784fe

Please sign in to comment.