Skip to content

Commit

Permalink
Merge branch 'main' into soft-delete-tests-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 25, 2023
2 parents 6d36913 + ed67a63 commit 2e7a437
Show file tree
Hide file tree
Showing 21 changed files with 334 additions and 430 deletions.
77 changes: 6 additions & 71 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,25 +783,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

stmts := []string{}
stmts := make([]string, 0, len(distinctTableNames)+1)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

release, err := c.grabJobsUpdateLock(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to grab lock: %v", err)
}

defer func() {
err := release()
if err != nil {
c.logger.Error("failed to release lock", slog.Any("error", err))
}
}()

stmts = append(stmts, "BEGIN TRANSACTION;")

for _, tableName := range distinctTableNames {
mergeGen := &mergeStmtGenerator{
Dataset: c.datasetID,
Expand All @@ -826,11 +812,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")

_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
query := strings.Join(stmts, "\n")
_, err = c.client.Query(query).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err)
return nil, fmt.Errorf("failed to execute statements %s: %v", query, err)
}

return &model.NormalizeResponse{
Expand Down Expand Up @@ -929,7 +915,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec
c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
"UPDATE %s.%s SET offset = %d,sync_batch_id=%d WHERE mirror_job_name = '%s';",
"UPDATE %s.%s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName)
}

Expand Down Expand Up @@ -1025,21 +1011,9 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
release, err := c.grabJobsUpdateLock(jobName)
if err != nil {
return fmt.Errorf("failed to grab lock: %w", err)
}

defer func() {
err := release()
if err != nil {
c.logger.Error("failed to release lock", slog.Any("error", err))
}
}()

dataset := c.client.Dataset(c.datasetID)
// deleting PeerDB specific tables
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}
Expand Down Expand Up @@ -1071,45 +1045,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
// returns a function to release the lock.
func (c *BigQueryConnector) grabJobsUpdateLock(flowJobName string) (func() error, error) {
c.logger.InfoContext(c.ctx,
fmt.Sprintf("[%s] trying to acquire BigQuery metadata table update lock", flowJobName))
tx, err := c.catalogPool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
var lockStatus pgtype.Bool
for !lockStatus.Bool {
row := tx.QueryRow(c.ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1))", mjTbl)

err = row.Scan(&lockStatus)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

c.logger.InfoContext(c.ctx,
fmt.Sprintf(`[%s] failed to acquire BigQuery metadata table update lock,
trying again in 5 seconds.`, flowJobName))
time.Sleep(5 * time.Second)
}

return func() error {
err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}, nil
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down
26 changes: 13 additions & 13 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)

// execute the statements in a transaction
stmts := []string{}
stmts = append(stmts, "BEGIN TRANSACTION;")
stmts = append(stmts, insertStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
stmts := []string{
"BEGIN TRANSACTION;",
insertStmt,
updateMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
Expand Down Expand Up @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client
datasetID := s.connector.datasetID
// Start a transaction
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if softDeleteCol != "" { // PeerDB column
Expand All @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
datasetID, dstTableName, selector, datasetID, stagingTable)

stmts = append(stmts, insertStmt)

insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime)
if err != nil {
return -1, fmt.Errorf("failed to create metadata insert statement: %v", err)
}
slog.Info("Performing transaction inside QRep sync function", flowLog)
stmts = append(stmts, insertMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
// Execute the statements in a transaction

stmts := []string{
"BEGIN TRANSACTION;",
insertStmt,
insertMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
Expand Down
19 changes: 0 additions & 19 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
Expand Down Expand Up @@ -254,17 +253,6 @@ func (p *PostgresCDCSource) consumeStream(

for {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
consumedXLogPos = proposedConsumedXLogPos
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
Expand Down Expand Up @@ -477,13 +465,6 @@ func (p *PostgresCDCSource) consumeStream(
if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
}

if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ const (
rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
toVariantColumnName = "VAR_COLS"
mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (SELECT _PEERDB_UID,
_PEERDB_TIMESTAMP,
TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,
_PEERDB_UNCHANGED_TOAST_COLUMNS FROM
_PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND
mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (
SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,
_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS
FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND
_PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS
(SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,
_PEERDB_UNCHANGED_TOAST_COLUMNS,%s
Expand All @@ -66,7 +65,8 @@ const (

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)"

updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET OFFSET=?, SYNC_BATCH_ID=? WHERE MIRROR_JOB_NAME=?"
updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=?
WHERE MIRROR_JOB_NAME=?`
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?"

checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES
Expand Down
77 changes: 0 additions & 77 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2e7a437

Please sign in to comment.